broker_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999
  1. package sarama
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "reflect"
  8. "testing"
  9. "time"
  10. "github.com/rcrowley/go-metrics"
  11. "gopkg.in/jcmturner/gokrb5.v7/krberror"
  12. )
  13. func ExampleBroker() {
  14. broker := NewBroker("localhost:9092")
  15. err := broker.Open(nil)
  16. if err != nil {
  17. panic(err)
  18. }
  19. request := MetadataRequest{Topics: []string{"myTopic"}}
  20. response, err := broker.GetMetadata(&request)
  21. if err != nil {
  22. _ = broker.Close()
  23. panic(err)
  24. }
  25. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  26. if err = broker.Close(); err != nil {
  27. panic(err)
  28. }
  29. }
  30. type mockEncoder struct {
  31. bytes []byte
  32. }
  33. func (m mockEncoder) encode(pe packetEncoder) error {
  34. return pe.putRawBytes(m.bytes)
  35. }
  36. type brokerMetrics struct {
  37. bytesRead int
  38. bytesWritten int
  39. }
  40. func TestBrokerAccessors(t *testing.T) {
  41. broker := NewBroker("abc:123")
  42. if broker.ID() != -1 {
  43. t.Error("New broker didn't have an ID of -1.")
  44. }
  45. if broker.Addr() != "abc:123" {
  46. t.Error("New broker didn't have the correct address")
  47. }
  48. if broker.Rack() != "" {
  49. t.Error("New broker didn't have an unknown rack.")
  50. }
  51. broker.id = 34
  52. if broker.ID() != 34 {
  53. t.Error("Manually setting broker ID did not take effect.")
  54. }
  55. rack := "dc1"
  56. broker.rack = &rack
  57. if broker.Rack() != rack {
  58. t.Error("Manually setting broker rack did not take effect.")
  59. }
  60. }
  61. func TestSimpleBrokerCommunication(t *testing.T) {
  62. for _, tt := range brokerTestTable {
  63. Logger.Printf("Testing broker communication for %s", tt.name)
  64. mb := NewMockBroker(t, 0)
  65. mb.Returns(&mockEncoder{tt.response})
  66. pendingNotify := make(chan brokerMetrics)
  67. // Register a callback to be notified about successful requests
  68. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  69. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  70. })
  71. broker := NewBroker(mb.Addr())
  72. // Set the broker id in order to validate local broker metrics
  73. broker.id = 0
  74. conf := NewConfig()
  75. conf.Version = tt.version
  76. err := broker.Open(conf)
  77. if err != nil {
  78. t.Fatal(err)
  79. }
  80. tt.runner(t, broker)
  81. // Wait up to 500 ms for the remote broker to process the request and
  82. // notify us about the metrics
  83. timeout := 500 * time.Millisecond
  84. select {
  85. case mockBrokerMetrics := <-pendingNotify:
  86. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  87. case <-time.After(timeout):
  88. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  89. }
  90. mb.Close()
  91. err = broker.Close()
  92. if err != nil {
  93. t.Error(err)
  94. }
  95. }
  96. }
  97. var ErrTokenFailure = errors.New("Failure generating token")
  98. type TokenProvider struct {
  99. accessToken *AccessToken
  100. err error
  101. }
  102. func (t *TokenProvider) Token() (*AccessToken, error) {
  103. return t.accessToken, t.err
  104. }
  105. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  106. return &TokenProvider{
  107. accessToken: token,
  108. err: err,
  109. }
  110. }
  111. func TestSASLOAuthBearer(t *testing.T) {
  112. testTable := []struct {
  113. name string
  114. authidentity string
  115. mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker
  116. mockSASLAuthResponse MockResponse // Mock SaslAuthenticateRequest response from broker
  117. expectClientErr bool // Expect an internal client-side error
  118. expectedBrokerError KError // Expected Kafka error returned by client
  119. tokProvider *TokenProvider
  120. }{
  121. {
  122. name: "SASL/OAUTHBEARER OK server response",
  123. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  124. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  125. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  126. expectClientErr: false,
  127. expectedBrokerError: ErrNoError,
  128. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  129. },
  130. {
  131. name: "SASL/OAUTHBEARER authentication failure response",
  132. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  133. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  134. mockSASLAuthResponse: NewMockSequence(
  135. // First, the broker response with a challenge
  136. NewMockSaslAuthenticateResponse(t).
  137. SetAuthBytes([]byte(`{"status":"invalid_request1"}`)),
  138. // Next, the client terminates the token exchange. Finally, the
  139. // broker responds with an error message.
  140. NewMockSaslAuthenticateResponse(t).
  141. SetAuthBytes([]byte(`{"status":"invalid_request2"}`)).
  142. SetError(ErrSASLAuthenticationFailed),
  143. ),
  144. expectClientErr: true,
  145. expectedBrokerError: ErrSASLAuthenticationFailed,
  146. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  147. },
  148. {
  149. name: "SASL/OAUTHBEARER handshake failure response",
  150. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  151. SetEnabledMechanisms([]string{SASLTypeOAuth}).
  152. SetError(ErrSASLAuthenticationFailed),
  153. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  154. expectClientErr: true,
  155. expectedBrokerError: ErrSASLAuthenticationFailed,
  156. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  157. },
  158. {
  159. name: "SASL/OAUTHBEARER token generation error",
  160. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  161. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  162. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  163. expectClientErr: true,
  164. expectedBrokerError: ErrNoError,
  165. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  166. },
  167. {
  168. name: "SASL/OAUTHBEARER invalid extension",
  169. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  170. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  171. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  172. expectClientErr: true,
  173. expectedBrokerError: ErrNoError,
  174. tokProvider: newTokenProvider(&AccessToken{
  175. Token: "access-token-123",
  176. Extensions: map[string]string{"auth": "auth-value"},
  177. }, nil),
  178. },
  179. }
  180. for i, test := range testTable {
  181. // mockBroker mocks underlying network logic and broker responses
  182. mockBroker := NewMockBroker(t, 0)
  183. mockBroker.SetHandlerByMap(map[string]MockResponse{
  184. "SaslAuthenticateRequest": test.mockSASLAuthResponse,
  185. "SaslHandshakeRequest": test.mockSASLHandshakeResponse,
  186. })
  187. // broker executes SASL requests against mockBroker
  188. broker := NewBroker(mockBroker.Addr())
  189. broker.requestRate = metrics.NilMeter{}
  190. broker.outgoingByteRate = metrics.NilMeter{}
  191. broker.incomingByteRate = metrics.NilMeter{}
  192. broker.requestSize = metrics.NilHistogram{}
  193. broker.responseSize = metrics.NilHistogram{}
  194. broker.responseRate = metrics.NilMeter{}
  195. broker.requestLatency = metrics.NilHistogram{}
  196. broker.requestsInFlight = metrics.NilCounter{}
  197. conf := NewConfig()
  198. conf.Net.SASL.Mechanism = SASLTypeOAuth
  199. conf.Net.SASL.TokenProvider = test.tokProvider
  200. broker.conf = conf
  201. dialer := net.Dialer{
  202. Timeout: conf.Net.DialTimeout,
  203. KeepAlive: conf.Net.KeepAlive,
  204. LocalAddr: conf.Net.LocalAddr,
  205. }
  206. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  207. if err != nil {
  208. t.Fatal(err)
  209. }
  210. broker.conn = conn
  211. err = broker.authenticateViaSASL()
  212. if test.expectedBrokerError != ErrNoError {
  213. if test.expectedBrokerError != err {
  214. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err)
  215. }
  216. } else if test.expectClientErr && err == nil {
  217. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  218. } else if !test.expectClientErr && err != nil {
  219. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  220. }
  221. mockBroker.Close()
  222. }
  223. }
  224. // A mock scram client.
  225. type MockSCRAMClient struct {
  226. done bool
  227. }
  228. func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) {
  229. return nil
  230. }
  231. func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
  232. if challenge == "" {
  233. return "ping", nil
  234. }
  235. if challenge == "pong" {
  236. m.done = true
  237. return "", nil
  238. }
  239. return "", errors.New("failed to authenticate :(")
  240. }
  241. func (m *MockSCRAMClient) Done() bool {
  242. return m.done
  243. }
  244. var _ SCRAMClient = &MockSCRAMClient{}
  245. func TestSASLSCRAMSHAXXX(t *testing.T) {
  246. testTable := []struct {
  247. name string
  248. mockHandshakeErr KError
  249. mockSASLAuthErr KError
  250. expectClientErr bool
  251. scramClient *MockSCRAMClient
  252. scramChallengeResp string
  253. }{
  254. {
  255. name: "SASL/SCRAMSHAXXX successfull authentication",
  256. mockHandshakeErr: ErrNoError,
  257. scramClient: &MockSCRAMClient{},
  258. scramChallengeResp: "pong",
  259. },
  260. {
  261. name: "SASL/SCRAMSHAXXX SCRAM client step error client",
  262. mockHandshakeErr: ErrNoError,
  263. mockSASLAuthErr: ErrNoError,
  264. scramClient: &MockSCRAMClient{},
  265. scramChallengeResp: "gong",
  266. expectClientErr: true,
  267. },
  268. {
  269. name: "SASL/SCRAMSHAXXX server authentication error",
  270. mockHandshakeErr: ErrNoError,
  271. mockSASLAuthErr: ErrSASLAuthenticationFailed,
  272. scramClient: &MockSCRAMClient{},
  273. scramChallengeResp: "pong",
  274. },
  275. {
  276. name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
  277. mockHandshakeErr: ErrUnsupportedSASLMechanism,
  278. mockSASLAuthErr: ErrNoError,
  279. scramClient: &MockSCRAMClient{},
  280. scramChallengeResp: "pong",
  281. },
  282. }
  283. for i, test := range testTable {
  284. // mockBroker mocks underlying network logic and broker responses
  285. mockBroker := NewMockBroker(t, 0)
  286. broker := NewBroker(mockBroker.Addr())
  287. // broker executes SASL requests against mockBroker
  288. broker.requestRate = metrics.NilMeter{}
  289. broker.outgoingByteRate = metrics.NilMeter{}
  290. broker.incomingByteRate = metrics.NilMeter{}
  291. broker.requestSize = metrics.NilHistogram{}
  292. broker.responseSize = metrics.NilHistogram{}
  293. broker.responseRate = metrics.NilMeter{}
  294. broker.requestLatency = metrics.NilHistogram{}
  295. broker.requestsInFlight = metrics.NilCounter{}
  296. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
  297. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
  298. if test.mockSASLAuthErr != ErrNoError {
  299. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
  300. }
  301. if test.mockHandshakeErr != ErrNoError {
  302. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  303. }
  304. mockBroker.SetHandlerByMap(map[string]MockResponse{
  305. "SaslAuthenticateRequest": mockSASLAuthResponse,
  306. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  307. })
  308. conf := NewConfig()
  309. conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  310. conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
  311. broker.conf = conf
  312. dialer := net.Dialer{
  313. Timeout: conf.Net.DialTimeout,
  314. KeepAlive: conf.Net.KeepAlive,
  315. LocalAddr: conf.Net.LocalAddr,
  316. }
  317. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  318. if err != nil {
  319. t.Fatal(err)
  320. }
  321. broker.conn = conn
  322. err = broker.authenticateViaSASL()
  323. if test.mockSASLAuthErr != ErrNoError {
  324. if test.mockSASLAuthErr != err {
  325. t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  326. }
  327. } else if test.mockHandshakeErr != ErrNoError {
  328. if test.mockHandshakeErr != err {
  329. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  330. }
  331. } else if test.expectClientErr && err == nil {
  332. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  333. } else if !test.expectClientErr && err != nil {
  334. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  335. }
  336. mockBroker.Close()
  337. }
  338. }
  339. func TestSASLPlainAuth(t *testing.T) {
  340. testTable := []struct {
  341. name string
  342. authidentity string
  343. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  344. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  345. expectClientErr bool // Expect an internal client-side error
  346. }{
  347. {
  348. name: "SASL Plain OK server response",
  349. mockAuthErr: ErrNoError,
  350. mockHandshakeErr: ErrNoError,
  351. },
  352. {
  353. name: "SASL Plain OK server response with authidentity",
  354. authidentity: "authid",
  355. mockAuthErr: ErrNoError,
  356. mockHandshakeErr: ErrNoError,
  357. },
  358. {
  359. name: "SASL Plain authentication failure response",
  360. mockAuthErr: ErrSASLAuthenticationFailed,
  361. mockHandshakeErr: ErrNoError,
  362. },
  363. {
  364. name: "SASL Plain handshake failure response",
  365. mockAuthErr: ErrNoError,
  366. mockHandshakeErr: ErrSASLAuthenticationFailed,
  367. },
  368. }
  369. for i, test := range testTable {
  370. // mockBroker mocks underlying network logic and broker responses
  371. mockBroker := NewMockBroker(t, 0)
  372. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  373. SetAuthBytes([]byte(`response_payload`))
  374. if test.mockAuthErr != ErrNoError {
  375. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  376. }
  377. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
  378. SetEnabledMechanisms([]string{SASLTypePlaintext})
  379. if test.mockHandshakeErr != ErrNoError {
  380. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  381. }
  382. mockBroker.SetHandlerByMap(map[string]MockResponse{
  383. "SaslAuthenticateRequest": mockSASLAuthResponse,
  384. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  385. })
  386. // broker executes SASL requests against mockBroker
  387. broker := NewBroker(mockBroker.Addr())
  388. broker.requestRate = metrics.NilMeter{}
  389. broker.outgoingByteRate = metrics.NilMeter{}
  390. broker.incomingByteRate = metrics.NilMeter{}
  391. broker.requestSize = metrics.NilHistogram{}
  392. broker.responseSize = metrics.NilHistogram{}
  393. broker.responseRate = metrics.NilMeter{}
  394. broker.requestLatency = metrics.NilHistogram{}
  395. broker.requestsInFlight = metrics.NilCounter{}
  396. conf := NewConfig()
  397. conf.Net.SASL.Mechanism = SASLTypePlaintext
  398. conf.Net.SASL.AuthIdentity = test.authidentity
  399. conf.Net.SASL.User = "token"
  400. conf.Net.SASL.Password = "password"
  401. conf.Net.SASL.Version = SASLHandshakeV1
  402. broker.conf = conf
  403. broker.conf.Version = V1_0_0_0
  404. dialer := net.Dialer{
  405. Timeout: conf.Net.DialTimeout,
  406. KeepAlive: conf.Net.KeepAlive,
  407. LocalAddr: conf.Net.LocalAddr,
  408. }
  409. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  410. if err != nil {
  411. t.Fatal(err)
  412. }
  413. broker.conn = conn
  414. err = broker.authenticateViaSASL()
  415. if err == nil {
  416. for _, rr := range mockBroker.History() {
  417. switch r := rr.Request.(type) {
  418. case *SaslAuthenticateRequest:
  419. x := bytes.SplitN(r.SaslAuthBytes, []byte("\x00"), 3)
  420. if string(x[0]) != conf.Net.SASL.AuthIdentity {
  421. t.Errorf("[%d]:[%s] expected %s auth identity, got %s\n", i, test.name, conf.Net.SASL.AuthIdentity, x[0])
  422. }
  423. if string(x[1]) != conf.Net.SASL.User {
  424. t.Errorf("[%d]:[%s] expected %s user, got %s\n", i, test.name, conf.Net.SASL.User, x[1])
  425. }
  426. if string(x[2]) != conf.Net.SASL.Password {
  427. t.Errorf("[%d]:[%s] expected %s password, got %s\n", i, test.name, conf.Net.SASL.Password, x[2])
  428. }
  429. }
  430. }
  431. }
  432. if test.mockAuthErr != ErrNoError {
  433. if test.mockAuthErr != err {
  434. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  435. }
  436. } else if test.mockHandshakeErr != ErrNoError {
  437. if test.mockHandshakeErr != err {
  438. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  439. }
  440. } else if test.expectClientErr && err == nil {
  441. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  442. } else if !test.expectClientErr && err != nil {
  443. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  444. }
  445. mockBroker.Close()
  446. }
  447. }
  448. // TestSASLReadTimeout ensures that the broker connection won't block forever
  449. // if the remote end never responds after the handshake
  450. func TestSASLReadTimeout(t *testing.T) {
  451. mockBroker := NewMockBroker(t, 0)
  452. defer mockBroker.Close()
  453. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  454. SetAuthBytes([]byte(`response_payload`))
  455. mockBroker.SetHandlerByMap(map[string]MockResponse{
  456. "SaslAuthenticateRequest": mockSASLAuthResponse,
  457. })
  458. broker := NewBroker(mockBroker.Addr())
  459. {
  460. broker.requestRate = metrics.NilMeter{}
  461. broker.outgoingByteRate = metrics.NilMeter{}
  462. broker.incomingByteRate = metrics.NilMeter{}
  463. broker.requestSize = metrics.NilHistogram{}
  464. broker.responseSize = metrics.NilHistogram{}
  465. broker.responseRate = metrics.NilMeter{}
  466. broker.requestLatency = metrics.NilHistogram{}
  467. broker.requestsInFlight = metrics.NilCounter{}
  468. }
  469. conf := NewConfig()
  470. {
  471. conf.Net.ReadTimeout = time.Millisecond
  472. conf.Net.SASL.Mechanism = SASLTypePlaintext
  473. conf.Net.SASL.User = "token"
  474. conf.Net.SASL.Password = "password"
  475. conf.Net.SASL.Version = SASLHandshakeV1
  476. }
  477. broker.conf = conf
  478. broker.conf.Version = V1_0_0_0
  479. dialer := net.Dialer{}
  480. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  481. if err != nil {
  482. t.Fatal(err)
  483. }
  484. broker.conn = conn
  485. err = broker.authenticateViaSASL()
  486. if err == nil {
  487. t.Errorf("should never happen - expected read timeout")
  488. }
  489. }
  490. func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
  491. testTable := []struct {
  492. name string
  493. error error
  494. mockKerberosClient bool
  495. errorStage string
  496. badResponse bool
  497. badKeyChecksum bool
  498. }{
  499. {
  500. name: "Kerberos authentication success",
  501. error: nil,
  502. mockKerberosClient: true,
  503. },
  504. {
  505. name: "Kerberos login fails",
  506. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  507. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  508. "cation information was invalid - PREAUTH_FAILED"),
  509. mockKerberosClient: true,
  510. errorStage: "login",
  511. },
  512. {
  513. name: "Kerberos service ticket fails",
  514. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  515. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  516. "cation information was invalid - PREAUTH_FAILED"),
  517. mockKerberosClient: true,
  518. errorStage: "service_ticket",
  519. },
  520. {
  521. name: "Kerberos client creation fails",
  522. error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
  523. },
  524. {
  525. name: "Bad server response, unmarshall key error",
  526. error: errors.New("bytes shorter than header length"),
  527. badResponse: true,
  528. mockKerberosClient: true,
  529. },
  530. {
  531. name: "Bad token checksum",
  532. error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
  533. badResponse: false,
  534. badKeyChecksum: true,
  535. mockKerberosClient: true,
  536. },
  537. }
  538. for i, test := range testTable {
  539. mockBroker := NewMockBroker(t, 0)
  540. // broker executes SASL requests against mockBroker
  541. mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
  542. return nil
  543. })
  544. broker := NewBroker(mockBroker.Addr())
  545. broker.requestRate = metrics.NilMeter{}
  546. broker.outgoingByteRate = metrics.NilMeter{}
  547. broker.incomingByteRate = metrics.NilMeter{}
  548. broker.requestSize = metrics.NilHistogram{}
  549. broker.responseSize = metrics.NilHistogram{}
  550. broker.responseRate = metrics.NilMeter{}
  551. broker.requestLatency = metrics.NilHistogram{}
  552. broker.requestsInFlight = metrics.NilCounter{}
  553. conf := NewConfig()
  554. conf.Net.SASL.Mechanism = SASLTypeGSSAPI
  555. conf.Net.SASL.GSSAPI.ServiceName = "kafka"
  556. conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
  557. conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
  558. conf.Net.SASL.GSSAPI.Username = "kafka"
  559. conf.Net.SASL.GSSAPI.Password = "kafka"
  560. conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
  561. conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  562. broker.conf = conf
  563. broker.conf.Version = V1_0_0_0
  564. dialer := net.Dialer{
  565. Timeout: conf.Net.DialTimeout,
  566. KeepAlive: conf.Net.KeepAlive,
  567. LocalAddr: conf.Net.LocalAddr,
  568. }
  569. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  570. if err != nil {
  571. t.Fatal(err)
  572. }
  573. gssapiHandler := KafkaGSSAPIHandler{
  574. client: &MockKerberosClient{},
  575. badResponse: test.badResponse,
  576. badKeyChecksum: test.badKeyChecksum,
  577. }
  578. mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
  579. broker.conn = conn
  580. if test.mockKerberosClient {
  581. broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
  582. return &MockKerberosClient{
  583. mockError: test.error,
  584. errorStage: test.errorStage,
  585. }, nil
  586. }
  587. } else {
  588. broker.kerberosAuthenticator.NewKerberosClientFunc = nil
  589. }
  590. err = broker.authenticateViaSASL()
  591. if err != nil && test.error != nil {
  592. if test.error.Error() != err.Error() {
  593. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  594. }
  595. } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
  596. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  597. }
  598. mockBroker.Close()
  599. }
  600. }
  601. func TestBuildClientFirstMessage(t *testing.T) {
  602. testTable := []struct {
  603. name string
  604. token *AccessToken
  605. expected []byte
  606. expectError bool
  607. }{
  608. {
  609. name: "Build SASL client initial response with two extensions",
  610. token: &AccessToken{
  611. Token: "the-token",
  612. Extensions: map[string]string{
  613. "x": "1",
  614. "y": "2",
  615. },
  616. },
  617. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  618. },
  619. {
  620. name: "Build SASL client initial response with no extensions",
  621. token: &AccessToken{Token: "the-token"},
  622. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  623. },
  624. {
  625. name: "Build SASL client initial response using reserved extension",
  626. token: &AccessToken{
  627. Token: "the-token",
  628. Extensions: map[string]string{
  629. "auth": "auth-value",
  630. },
  631. },
  632. expected: []byte(""),
  633. expectError: true,
  634. },
  635. }
  636. for i, test := range testTable {
  637. actual, err := buildClientFirstMessage(test.token)
  638. if !reflect.DeepEqual(test.expected, actual) {
  639. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  640. }
  641. if test.expectError && err == nil {
  642. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  643. }
  644. if !test.expectError && err != nil {
  645. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  646. }
  647. }
  648. }
  649. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  650. var brokerTestTable = []struct {
  651. version KafkaVersion
  652. name string
  653. response []byte
  654. runner func(*testing.T, *Broker)
  655. }{
  656. {V0_10_0_0,
  657. "MetadataRequest",
  658. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  659. func(t *testing.T, broker *Broker) {
  660. request := MetadataRequest{}
  661. response, err := broker.GetMetadata(&request)
  662. if err != nil {
  663. t.Error(err)
  664. }
  665. if response == nil {
  666. t.Error("Metadata request got no response!")
  667. }
  668. }},
  669. {V0_10_0_0,
  670. "ConsumerMetadataRequest",
  671. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  672. func(t *testing.T, broker *Broker) {
  673. request := ConsumerMetadataRequest{}
  674. response, err := broker.GetConsumerMetadata(&request)
  675. if err != nil {
  676. t.Error(err)
  677. }
  678. if response == nil {
  679. t.Error("Consumer Metadata request got no response!")
  680. }
  681. }},
  682. {V0_10_0_0,
  683. "ProduceRequest (NoResponse)",
  684. []byte{},
  685. func(t *testing.T, broker *Broker) {
  686. request := ProduceRequest{}
  687. request.RequiredAcks = NoResponse
  688. response, err := broker.Produce(&request)
  689. if err != nil {
  690. t.Error(err)
  691. }
  692. if response != nil {
  693. t.Error("Produce request with NoResponse got a response!")
  694. }
  695. }},
  696. {V0_10_0_0,
  697. "ProduceRequest (WaitForLocal)",
  698. []byte{0x00, 0x00, 0x00, 0x00},
  699. func(t *testing.T, broker *Broker) {
  700. request := ProduceRequest{}
  701. request.RequiredAcks = WaitForLocal
  702. response, err := broker.Produce(&request)
  703. if err != nil {
  704. t.Error(err)
  705. }
  706. if response == nil {
  707. t.Error("Produce request without NoResponse got no response!")
  708. }
  709. }},
  710. {V0_10_0_0,
  711. "FetchRequest",
  712. []byte{0x00, 0x00, 0x00, 0x00},
  713. func(t *testing.T, broker *Broker) {
  714. request := FetchRequest{}
  715. response, err := broker.Fetch(&request)
  716. if err != nil {
  717. t.Error(err)
  718. }
  719. if response == nil {
  720. t.Error("Fetch request got no response!")
  721. }
  722. }},
  723. {V0_10_0_0,
  724. "OffsetFetchRequest",
  725. []byte{0x00, 0x00, 0x00, 0x00},
  726. func(t *testing.T, broker *Broker) {
  727. request := OffsetFetchRequest{}
  728. response, err := broker.FetchOffset(&request)
  729. if err != nil {
  730. t.Error(err)
  731. }
  732. if response == nil {
  733. t.Error("OffsetFetch request got no response!")
  734. }
  735. }},
  736. {V0_10_0_0,
  737. "OffsetCommitRequest",
  738. []byte{0x00, 0x00, 0x00, 0x00},
  739. func(t *testing.T, broker *Broker) {
  740. request := OffsetCommitRequest{}
  741. response, err := broker.CommitOffset(&request)
  742. if err != nil {
  743. t.Error(err)
  744. }
  745. if response == nil {
  746. t.Error("OffsetCommit request got no response!")
  747. }
  748. }},
  749. {V0_10_0_0,
  750. "OffsetRequest",
  751. []byte{0x00, 0x00, 0x00, 0x00},
  752. func(t *testing.T, broker *Broker) {
  753. request := OffsetRequest{}
  754. response, err := broker.GetAvailableOffsets(&request)
  755. if err != nil {
  756. t.Error(err)
  757. }
  758. if response == nil {
  759. t.Error("Offset request got no response!")
  760. }
  761. }},
  762. {V0_10_0_0,
  763. "JoinGroupRequest",
  764. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  765. func(t *testing.T, broker *Broker) {
  766. request := JoinGroupRequest{}
  767. response, err := broker.JoinGroup(&request)
  768. if err != nil {
  769. t.Error(err)
  770. }
  771. if response == nil {
  772. t.Error("JoinGroup request got no response!")
  773. }
  774. }},
  775. {V0_10_0_0,
  776. "SyncGroupRequest",
  777. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  778. func(t *testing.T, broker *Broker) {
  779. request := SyncGroupRequest{}
  780. response, err := broker.SyncGroup(&request)
  781. if err != nil {
  782. t.Error(err)
  783. }
  784. if response == nil {
  785. t.Error("SyncGroup request got no response!")
  786. }
  787. }},
  788. {V0_10_0_0,
  789. "LeaveGroupRequest",
  790. []byte{0x00, 0x00},
  791. func(t *testing.T, broker *Broker) {
  792. request := LeaveGroupRequest{}
  793. response, err := broker.LeaveGroup(&request)
  794. if err != nil {
  795. t.Error(err)
  796. }
  797. if response == nil {
  798. t.Error("LeaveGroup request got no response!")
  799. }
  800. }},
  801. {V0_10_0_0,
  802. "HeartbeatRequest",
  803. []byte{0x00, 0x00},
  804. func(t *testing.T, broker *Broker) {
  805. request := HeartbeatRequest{}
  806. response, err := broker.Heartbeat(&request)
  807. if err != nil {
  808. t.Error(err)
  809. }
  810. if response == nil {
  811. t.Error("Heartbeat request got no response!")
  812. }
  813. }},
  814. {V0_10_0_0,
  815. "ListGroupsRequest",
  816. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  817. func(t *testing.T, broker *Broker) {
  818. request := ListGroupsRequest{}
  819. response, err := broker.ListGroups(&request)
  820. if err != nil {
  821. t.Error(err)
  822. }
  823. if response == nil {
  824. t.Error("ListGroups request got no response!")
  825. }
  826. }},
  827. {V0_10_0_0,
  828. "DescribeGroupsRequest",
  829. []byte{0x00, 0x00, 0x00, 0x00},
  830. func(t *testing.T, broker *Broker) {
  831. request := DescribeGroupsRequest{}
  832. response, err := broker.DescribeGroups(&request)
  833. if err != nil {
  834. t.Error(err)
  835. }
  836. if response == nil {
  837. t.Error("DescribeGroups request got no response!")
  838. }
  839. }},
  840. {V0_10_0_0,
  841. "ApiVersionsRequest",
  842. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  843. func(t *testing.T, broker *Broker) {
  844. request := ApiVersionsRequest{}
  845. response, err := broker.ApiVersions(&request)
  846. if err != nil {
  847. t.Error(err)
  848. }
  849. if response == nil {
  850. t.Error("ApiVersions request got no response!")
  851. }
  852. }},
  853. {V1_1_0_0,
  854. "DeleteGroupsRequest",
  855. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  856. func(t *testing.T, broker *Broker) {
  857. request := DeleteGroupsRequest{}
  858. response, err := broker.DeleteGroups(&request)
  859. if err != nil {
  860. t.Error(err)
  861. }
  862. if response == nil {
  863. t.Error("DeleteGroups request got no response!")
  864. }
  865. }},
  866. }
  867. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  868. metricValidators := newMetricValidators()
  869. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  870. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  871. // Check that the number of bytes sent corresponds to what the mock broker received
  872. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  873. if mockBrokerBytesWritten == 0 {
  874. // This a ProduceRequest with NoResponse
  875. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  876. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  877. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  878. } else {
  879. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  880. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  881. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  882. }
  883. // Check that the number of bytes received corresponds to what the mock broker sent
  884. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  885. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  886. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  887. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  888. // Check that there is no more requests in flight
  889. metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
  890. // Run the validators
  891. metricValidators.run(t, broker.conf.MetricRegistry)
  892. }