broker_test.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. package sarama
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "reflect"
  8. "testing"
  9. "time"
  10. "github.com/rcrowley/go-metrics"
  11. "gopkg.in/jcmturner/gokrb5.v7/krberror"
  12. )
  13. func ExampleBroker() {
  14. broker := NewBroker("localhost:9092")
  15. err := broker.Open(nil)
  16. if err != nil {
  17. panic(err)
  18. }
  19. request := MetadataRequest{Topics: []string{"myTopic"}}
  20. response, err := broker.GetMetadata(&request)
  21. if err != nil {
  22. _ = broker.Close()
  23. panic(err)
  24. }
  25. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  26. if err = broker.Close(); err != nil {
  27. panic(err)
  28. }
  29. }
  30. type mockEncoder struct {
  31. bytes []byte
  32. }
  33. func (m mockEncoder) encode(pe packetEncoder) error {
  34. return pe.putRawBytes(m.bytes)
  35. }
  36. func (m mockEncoder) headerVersion() int16 {
  37. return 0
  38. }
  39. type brokerMetrics struct {
  40. bytesRead int
  41. bytesWritten int
  42. }
  43. func TestBrokerAccessors(t *testing.T) {
  44. broker := NewBroker("abc:123")
  45. if broker.ID() != -1 {
  46. t.Error("New broker didn't have an ID of -1.")
  47. }
  48. if broker.Addr() != "abc:123" {
  49. t.Error("New broker didn't have the correct address")
  50. }
  51. if broker.Rack() != "" {
  52. t.Error("New broker didn't have an unknown rack.")
  53. }
  54. broker.id = 34
  55. if broker.ID() != 34 {
  56. t.Error("Manually setting broker ID did not take effect.")
  57. }
  58. rack := "dc1"
  59. broker.rack = &rack
  60. if broker.Rack() != rack {
  61. t.Error("Manually setting broker rack did not take effect.")
  62. }
  63. }
  64. func TestSimpleBrokerCommunication(t *testing.T) {
  65. for _, tt := range brokerTestTable {
  66. Logger.Printf("Testing broker communication for %s", tt.name)
  67. mb := NewMockBroker(t, 0)
  68. mb.Returns(&mockEncoder{tt.response})
  69. pendingNotify := make(chan brokerMetrics)
  70. // Register a callback to be notified about successful requests
  71. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  72. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  73. })
  74. broker := NewBroker(mb.Addr())
  75. // Set the broker id in order to validate local broker metrics
  76. broker.id = 0
  77. conf := NewConfig()
  78. conf.Version = tt.version
  79. err := broker.Open(conf)
  80. if err != nil {
  81. t.Fatal(err)
  82. }
  83. tt.runner(t, broker)
  84. // Wait up to 500 ms for the remote broker to process the request and
  85. // notify us about the metrics
  86. timeout := 500 * time.Millisecond
  87. select {
  88. case mockBrokerMetrics := <-pendingNotify:
  89. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  90. case <-time.After(timeout):
  91. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  92. }
  93. mb.Close()
  94. err = broker.Close()
  95. if err != nil {
  96. t.Error(err)
  97. }
  98. }
  99. }
  100. var ErrTokenFailure = errors.New("Failure generating token")
  101. type TokenProvider struct {
  102. accessToken *AccessToken
  103. err error
  104. }
  105. func (t *TokenProvider) Token() (*AccessToken, error) {
  106. return t.accessToken, t.err
  107. }
  108. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  109. return &TokenProvider{
  110. accessToken: token,
  111. err: err,
  112. }
  113. }
  114. func TestSASLOAuthBearer(t *testing.T) {
  115. testTable := []struct {
  116. name string
  117. authidentity string
  118. mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker
  119. mockSASLAuthResponse MockResponse // Mock SaslAuthenticateRequest response from broker
  120. expectClientErr bool // Expect an internal client-side error
  121. expectedBrokerError KError // Expected Kafka error returned by client
  122. tokProvider *TokenProvider
  123. }{
  124. {
  125. name: "SASL/OAUTHBEARER OK server response",
  126. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  127. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  128. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  129. expectClientErr: false,
  130. expectedBrokerError: ErrNoError,
  131. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  132. },
  133. {
  134. name: "SASL/OAUTHBEARER authentication failure response",
  135. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  136. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  137. mockSASLAuthResponse: NewMockSequence(
  138. // First, the broker response with a challenge
  139. NewMockSaslAuthenticateResponse(t).
  140. SetAuthBytes([]byte(`{"status":"invalid_request1"}`)),
  141. // Next, the client terminates the token exchange. Finally, the
  142. // broker responds with an error message.
  143. NewMockSaslAuthenticateResponse(t).
  144. SetAuthBytes([]byte(`{"status":"invalid_request2"}`)).
  145. SetError(ErrSASLAuthenticationFailed),
  146. ),
  147. expectClientErr: true,
  148. expectedBrokerError: ErrSASLAuthenticationFailed,
  149. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  150. },
  151. {
  152. name: "SASL/OAUTHBEARER handshake failure response",
  153. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  154. SetEnabledMechanisms([]string{SASLTypeOAuth}).
  155. SetError(ErrSASLAuthenticationFailed),
  156. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  157. expectClientErr: true,
  158. expectedBrokerError: ErrSASLAuthenticationFailed,
  159. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  160. },
  161. {
  162. name: "SASL/OAUTHBEARER token generation error",
  163. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  164. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  165. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  166. expectClientErr: true,
  167. expectedBrokerError: ErrNoError,
  168. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  169. },
  170. {
  171. name: "SASL/OAUTHBEARER invalid extension",
  172. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  173. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  174. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  175. expectClientErr: true,
  176. expectedBrokerError: ErrNoError,
  177. tokProvider: newTokenProvider(&AccessToken{
  178. Token: "access-token-123",
  179. Extensions: map[string]string{"auth": "auth-value"},
  180. }, nil),
  181. },
  182. }
  183. for i, test := range testTable {
  184. // mockBroker mocks underlying network logic and broker responses
  185. mockBroker := NewMockBroker(t, 0)
  186. mockBroker.SetHandlerByMap(map[string]MockResponse{
  187. "SaslAuthenticateRequest": test.mockSASLAuthResponse,
  188. "SaslHandshakeRequest": test.mockSASLHandshakeResponse,
  189. })
  190. // broker executes SASL requests against mockBroker
  191. broker := NewBroker(mockBroker.Addr())
  192. broker.requestRate = metrics.NilMeter{}
  193. broker.outgoingByteRate = metrics.NilMeter{}
  194. broker.incomingByteRate = metrics.NilMeter{}
  195. broker.requestSize = metrics.NilHistogram{}
  196. broker.responseSize = metrics.NilHistogram{}
  197. broker.responseRate = metrics.NilMeter{}
  198. broker.requestLatency = metrics.NilHistogram{}
  199. broker.requestsInFlight = metrics.NilCounter{}
  200. conf := NewConfig()
  201. conf.Net.SASL.Mechanism = SASLTypeOAuth
  202. conf.Net.SASL.TokenProvider = test.tokProvider
  203. broker.conf = conf
  204. dialer := net.Dialer{
  205. Timeout: conf.Net.DialTimeout,
  206. KeepAlive: conf.Net.KeepAlive,
  207. LocalAddr: conf.Net.LocalAddr,
  208. }
  209. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  210. if err != nil {
  211. t.Fatal(err)
  212. }
  213. broker.conn = conn
  214. err = broker.authenticateViaSASL()
  215. if test.expectedBrokerError != ErrNoError {
  216. if test.expectedBrokerError != err {
  217. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err)
  218. }
  219. } else if test.expectClientErr && err == nil {
  220. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  221. } else if !test.expectClientErr && err != nil {
  222. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  223. }
  224. mockBroker.Close()
  225. }
  226. }
  227. // A mock scram client.
  228. type MockSCRAMClient struct {
  229. done bool
  230. }
  231. func (m *MockSCRAMClient) Begin(userName, password, authzID string) (err error) {
  232. return nil
  233. }
  234. func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
  235. if challenge == "" {
  236. return "ping", nil
  237. }
  238. if challenge == "pong" {
  239. m.done = true
  240. return "", nil
  241. }
  242. return "", errors.New("failed to authenticate :(")
  243. }
  244. func (m *MockSCRAMClient) Done() bool {
  245. return m.done
  246. }
  247. var _ SCRAMClient = &MockSCRAMClient{}
  248. func TestSASLSCRAMSHAXXX(t *testing.T) {
  249. testTable := []struct {
  250. name string
  251. mockHandshakeErr KError
  252. mockSASLAuthErr KError
  253. expectClientErr bool
  254. scramClient *MockSCRAMClient
  255. scramChallengeResp string
  256. }{
  257. {
  258. name: "SASL/SCRAMSHAXXX successfull authentication",
  259. mockHandshakeErr: ErrNoError,
  260. scramClient: &MockSCRAMClient{},
  261. scramChallengeResp: "pong",
  262. },
  263. {
  264. name: "SASL/SCRAMSHAXXX SCRAM client step error client",
  265. mockHandshakeErr: ErrNoError,
  266. mockSASLAuthErr: ErrNoError,
  267. scramClient: &MockSCRAMClient{},
  268. scramChallengeResp: "gong",
  269. expectClientErr: true,
  270. },
  271. {
  272. name: "SASL/SCRAMSHAXXX server authentication error",
  273. mockHandshakeErr: ErrNoError,
  274. mockSASLAuthErr: ErrSASLAuthenticationFailed,
  275. scramClient: &MockSCRAMClient{},
  276. scramChallengeResp: "pong",
  277. },
  278. {
  279. name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
  280. mockHandshakeErr: ErrUnsupportedSASLMechanism,
  281. mockSASLAuthErr: ErrNoError,
  282. scramClient: &MockSCRAMClient{},
  283. scramChallengeResp: "pong",
  284. },
  285. }
  286. for i, test := range testTable {
  287. // mockBroker mocks underlying network logic and broker responses
  288. mockBroker := NewMockBroker(t, 0)
  289. broker := NewBroker(mockBroker.Addr())
  290. // broker executes SASL requests against mockBroker
  291. broker.requestRate = metrics.NilMeter{}
  292. broker.outgoingByteRate = metrics.NilMeter{}
  293. broker.incomingByteRate = metrics.NilMeter{}
  294. broker.requestSize = metrics.NilHistogram{}
  295. broker.responseSize = metrics.NilHistogram{}
  296. broker.responseRate = metrics.NilMeter{}
  297. broker.requestLatency = metrics.NilHistogram{}
  298. broker.requestsInFlight = metrics.NilCounter{}
  299. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
  300. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
  301. if test.mockSASLAuthErr != ErrNoError {
  302. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
  303. }
  304. if test.mockHandshakeErr != ErrNoError {
  305. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  306. }
  307. mockBroker.SetHandlerByMap(map[string]MockResponse{
  308. "SaslAuthenticateRequest": mockSASLAuthResponse,
  309. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  310. })
  311. conf := NewConfig()
  312. conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  313. conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
  314. broker.conf = conf
  315. dialer := net.Dialer{
  316. Timeout: conf.Net.DialTimeout,
  317. KeepAlive: conf.Net.KeepAlive,
  318. LocalAddr: conf.Net.LocalAddr,
  319. }
  320. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  321. if err != nil {
  322. t.Fatal(err)
  323. }
  324. broker.conn = conn
  325. err = broker.authenticateViaSASL()
  326. if test.mockSASLAuthErr != ErrNoError {
  327. if test.mockSASLAuthErr != err {
  328. t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  329. }
  330. } else if test.mockHandshakeErr != ErrNoError {
  331. if test.mockHandshakeErr != err {
  332. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  333. }
  334. } else if test.expectClientErr && err == nil {
  335. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  336. } else if !test.expectClientErr && err != nil {
  337. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  338. }
  339. mockBroker.Close()
  340. }
  341. }
  342. func TestSASLPlainAuth(t *testing.T) {
  343. testTable := []struct {
  344. name string
  345. authidentity string
  346. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  347. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  348. expectClientErr bool // Expect an internal client-side error
  349. }{
  350. {
  351. name: "SASL Plain OK server response",
  352. mockAuthErr: ErrNoError,
  353. mockHandshakeErr: ErrNoError,
  354. },
  355. {
  356. name: "SASL Plain OK server response with authidentity",
  357. authidentity: "authid",
  358. mockAuthErr: ErrNoError,
  359. mockHandshakeErr: ErrNoError,
  360. },
  361. {
  362. name: "SASL Plain authentication failure response",
  363. mockAuthErr: ErrSASLAuthenticationFailed,
  364. mockHandshakeErr: ErrNoError,
  365. },
  366. {
  367. name: "SASL Plain handshake failure response",
  368. mockAuthErr: ErrNoError,
  369. mockHandshakeErr: ErrSASLAuthenticationFailed,
  370. },
  371. }
  372. for i, test := range testTable {
  373. // mockBroker mocks underlying network logic and broker responses
  374. mockBroker := NewMockBroker(t, 0)
  375. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  376. SetAuthBytes([]byte(`response_payload`))
  377. if test.mockAuthErr != ErrNoError {
  378. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  379. }
  380. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
  381. SetEnabledMechanisms([]string{SASLTypePlaintext})
  382. if test.mockHandshakeErr != ErrNoError {
  383. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  384. }
  385. mockBroker.SetHandlerByMap(map[string]MockResponse{
  386. "SaslAuthenticateRequest": mockSASLAuthResponse,
  387. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  388. })
  389. // broker executes SASL requests against mockBroker
  390. broker := NewBroker(mockBroker.Addr())
  391. broker.requestRate = metrics.NilMeter{}
  392. broker.outgoingByteRate = metrics.NilMeter{}
  393. broker.incomingByteRate = metrics.NilMeter{}
  394. broker.requestSize = metrics.NilHistogram{}
  395. broker.responseSize = metrics.NilHistogram{}
  396. broker.responseRate = metrics.NilMeter{}
  397. broker.requestLatency = metrics.NilHistogram{}
  398. broker.requestsInFlight = metrics.NilCounter{}
  399. conf := NewConfig()
  400. conf.Net.SASL.Mechanism = SASLTypePlaintext
  401. conf.Net.SASL.AuthIdentity = test.authidentity
  402. conf.Net.SASL.User = "token"
  403. conf.Net.SASL.Password = "password"
  404. conf.Net.SASL.Version = SASLHandshakeV1
  405. broker.conf = conf
  406. broker.conf.Version = V1_0_0_0
  407. dialer := net.Dialer{
  408. Timeout: conf.Net.DialTimeout,
  409. KeepAlive: conf.Net.KeepAlive,
  410. LocalAddr: conf.Net.LocalAddr,
  411. }
  412. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  413. if err != nil {
  414. t.Fatal(err)
  415. }
  416. broker.conn = conn
  417. err = broker.authenticateViaSASL()
  418. if err == nil {
  419. for _, rr := range mockBroker.History() {
  420. switch r := rr.Request.(type) {
  421. case *SaslAuthenticateRequest:
  422. x := bytes.SplitN(r.SaslAuthBytes, []byte("\x00"), 3)
  423. if string(x[0]) != conf.Net.SASL.AuthIdentity {
  424. t.Errorf("[%d]:[%s] expected %s auth identity, got %s\n", i, test.name, conf.Net.SASL.AuthIdentity, x[0])
  425. }
  426. if string(x[1]) != conf.Net.SASL.User {
  427. t.Errorf("[%d]:[%s] expected %s user, got %s\n", i, test.name, conf.Net.SASL.User, x[1])
  428. }
  429. if string(x[2]) != conf.Net.SASL.Password {
  430. t.Errorf("[%d]:[%s] expected %s password, got %s\n", i, test.name, conf.Net.SASL.Password, x[2])
  431. }
  432. }
  433. }
  434. }
  435. if test.mockAuthErr != ErrNoError {
  436. if test.mockAuthErr != err {
  437. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  438. }
  439. } else if test.mockHandshakeErr != ErrNoError {
  440. if test.mockHandshakeErr != err {
  441. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  442. }
  443. } else if test.expectClientErr && err == nil {
  444. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  445. } else if !test.expectClientErr && err != nil {
  446. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  447. }
  448. mockBroker.Close()
  449. }
  450. }
  451. // TestSASLReadTimeout ensures that the broker connection won't block forever
  452. // if the remote end never responds after the handshake
  453. func TestSASLReadTimeout(t *testing.T) {
  454. mockBroker := NewMockBroker(t, 0)
  455. defer mockBroker.Close()
  456. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  457. SetAuthBytes([]byte(`response_payload`))
  458. mockBroker.SetHandlerByMap(map[string]MockResponse{
  459. "SaslAuthenticateRequest": mockSASLAuthResponse,
  460. })
  461. broker := NewBroker(mockBroker.Addr())
  462. {
  463. broker.requestRate = metrics.NilMeter{}
  464. broker.outgoingByteRate = metrics.NilMeter{}
  465. broker.incomingByteRate = metrics.NilMeter{}
  466. broker.requestSize = metrics.NilHistogram{}
  467. broker.responseSize = metrics.NilHistogram{}
  468. broker.responseRate = metrics.NilMeter{}
  469. broker.requestLatency = metrics.NilHistogram{}
  470. broker.requestsInFlight = metrics.NilCounter{}
  471. }
  472. conf := NewConfig()
  473. {
  474. conf.Net.ReadTimeout = time.Millisecond
  475. conf.Net.SASL.Mechanism = SASLTypePlaintext
  476. conf.Net.SASL.User = "token"
  477. conf.Net.SASL.Password = "password"
  478. conf.Net.SASL.Version = SASLHandshakeV1
  479. }
  480. broker.conf = conf
  481. broker.conf.Version = V1_0_0_0
  482. dialer := net.Dialer{}
  483. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. broker.conn = conn
  488. err = broker.authenticateViaSASL()
  489. if err == nil {
  490. t.Errorf("should never happen - expected read timeout")
  491. }
  492. }
  493. func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
  494. testTable := []struct {
  495. name string
  496. error error
  497. mockKerberosClient bool
  498. errorStage string
  499. badResponse bool
  500. badKeyChecksum bool
  501. }{
  502. {
  503. name: "Kerberos authentication success",
  504. error: nil,
  505. mockKerberosClient: true,
  506. },
  507. {
  508. name: "Kerberos login fails",
  509. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  510. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  511. "cation information was invalid - PREAUTH_FAILED"),
  512. mockKerberosClient: true,
  513. errorStage: "login",
  514. },
  515. {
  516. name: "Kerberos service ticket fails",
  517. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  518. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  519. "cation information was invalid - PREAUTH_FAILED"),
  520. mockKerberosClient: true,
  521. errorStage: "service_ticket",
  522. },
  523. {
  524. name: "Kerberos client creation fails",
  525. error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
  526. },
  527. {
  528. name: "Bad server response, unmarshall key error",
  529. error: errors.New("bytes shorter than header length"),
  530. badResponse: true,
  531. mockKerberosClient: true,
  532. },
  533. {
  534. name: "Bad token checksum",
  535. error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
  536. badResponse: false,
  537. badKeyChecksum: true,
  538. mockKerberosClient: true,
  539. },
  540. }
  541. for i, test := range testTable {
  542. mockBroker := NewMockBroker(t, 0)
  543. // broker executes SASL requests against mockBroker
  544. mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
  545. return nil
  546. })
  547. broker := NewBroker(mockBroker.Addr())
  548. broker.requestRate = metrics.NilMeter{}
  549. broker.outgoingByteRate = metrics.NilMeter{}
  550. broker.incomingByteRate = metrics.NilMeter{}
  551. broker.requestSize = metrics.NilHistogram{}
  552. broker.responseSize = metrics.NilHistogram{}
  553. broker.responseRate = metrics.NilMeter{}
  554. broker.requestLatency = metrics.NilHistogram{}
  555. broker.requestsInFlight = metrics.NilCounter{}
  556. conf := NewConfig()
  557. conf.Net.SASL.Mechanism = SASLTypeGSSAPI
  558. conf.Net.SASL.GSSAPI.ServiceName = "kafka"
  559. conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
  560. conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
  561. conf.Net.SASL.GSSAPI.Username = "kafka"
  562. conf.Net.SASL.GSSAPI.Password = "kafka"
  563. conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
  564. conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  565. broker.conf = conf
  566. broker.conf.Version = V1_0_0_0
  567. dialer := net.Dialer{
  568. Timeout: conf.Net.DialTimeout,
  569. KeepAlive: conf.Net.KeepAlive,
  570. LocalAddr: conf.Net.LocalAddr,
  571. }
  572. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  573. if err != nil {
  574. t.Fatal(err)
  575. }
  576. gssapiHandler := KafkaGSSAPIHandler{
  577. client: &MockKerberosClient{},
  578. badResponse: test.badResponse,
  579. badKeyChecksum: test.badKeyChecksum,
  580. }
  581. mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
  582. broker.conn = conn
  583. if test.mockKerberosClient {
  584. broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
  585. return &MockKerberosClient{
  586. mockError: test.error,
  587. errorStage: test.errorStage,
  588. }, nil
  589. }
  590. } else {
  591. broker.kerberosAuthenticator.NewKerberosClientFunc = nil
  592. }
  593. err = broker.authenticateViaSASL()
  594. if err != nil && test.error != nil {
  595. if test.error.Error() != err.Error() {
  596. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  597. }
  598. } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
  599. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  600. }
  601. mockBroker.Close()
  602. }
  603. }
  604. func TestBuildClientFirstMessage(t *testing.T) {
  605. testTable := []struct {
  606. name string
  607. token *AccessToken
  608. expected []byte
  609. expectError bool
  610. }{
  611. {
  612. name: "Build SASL client initial response with two extensions",
  613. token: &AccessToken{
  614. Token: "the-token",
  615. Extensions: map[string]string{
  616. "x": "1",
  617. "y": "2",
  618. },
  619. },
  620. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  621. },
  622. {
  623. name: "Build SASL client initial response with no extensions",
  624. token: &AccessToken{Token: "the-token"},
  625. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  626. },
  627. {
  628. name: "Build SASL client initial response using reserved extension",
  629. token: &AccessToken{
  630. Token: "the-token",
  631. Extensions: map[string]string{
  632. "auth": "auth-value",
  633. },
  634. },
  635. expected: []byte(""),
  636. expectError: true,
  637. },
  638. }
  639. for i, test := range testTable {
  640. actual, err := buildClientFirstMessage(test.token)
  641. if !reflect.DeepEqual(test.expected, actual) {
  642. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  643. }
  644. if test.expectError && err == nil {
  645. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  646. }
  647. if !test.expectError && err != nil {
  648. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  649. }
  650. }
  651. }
  652. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  653. var brokerTestTable = []struct {
  654. version KafkaVersion
  655. name string
  656. response []byte
  657. runner func(*testing.T, *Broker)
  658. }{
  659. {V0_10_0_0,
  660. "MetadataRequest",
  661. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  662. func(t *testing.T, broker *Broker) {
  663. request := MetadataRequest{}
  664. response, err := broker.GetMetadata(&request)
  665. if err != nil {
  666. t.Error(err)
  667. }
  668. if response == nil {
  669. t.Error("Metadata request got no response!")
  670. }
  671. }},
  672. {V0_10_0_0,
  673. "ConsumerMetadataRequest",
  674. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  675. func(t *testing.T, broker *Broker) {
  676. request := ConsumerMetadataRequest{}
  677. response, err := broker.GetConsumerMetadata(&request)
  678. if err != nil {
  679. t.Error(err)
  680. }
  681. if response == nil {
  682. t.Error("Consumer Metadata request got no response!")
  683. }
  684. }},
  685. {V0_10_0_0,
  686. "ProduceRequest (NoResponse)",
  687. []byte{},
  688. func(t *testing.T, broker *Broker) {
  689. request := ProduceRequest{}
  690. request.RequiredAcks = NoResponse
  691. response, err := broker.Produce(&request)
  692. if err != nil {
  693. t.Error(err)
  694. }
  695. if response != nil {
  696. t.Error("Produce request with NoResponse got a response!")
  697. }
  698. }},
  699. {V0_10_0_0,
  700. "ProduceRequest (WaitForLocal)",
  701. []byte{0x00, 0x00, 0x00, 0x00},
  702. func(t *testing.T, broker *Broker) {
  703. request := ProduceRequest{}
  704. request.RequiredAcks = WaitForLocal
  705. response, err := broker.Produce(&request)
  706. if err != nil {
  707. t.Error(err)
  708. }
  709. if response == nil {
  710. t.Error("Produce request without NoResponse got no response!")
  711. }
  712. }},
  713. {V0_10_0_0,
  714. "FetchRequest",
  715. []byte{0x00, 0x00, 0x00, 0x00},
  716. func(t *testing.T, broker *Broker) {
  717. request := FetchRequest{}
  718. response, err := broker.Fetch(&request)
  719. if err != nil {
  720. t.Error(err)
  721. }
  722. if response == nil {
  723. t.Error("Fetch request got no response!")
  724. }
  725. }},
  726. {V0_10_0_0,
  727. "OffsetFetchRequest",
  728. []byte{0x00, 0x00, 0x00, 0x00},
  729. func(t *testing.T, broker *Broker) {
  730. request := OffsetFetchRequest{}
  731. response, err := broker.FetchOffset(&request)
  732. if err != nil {
  733. t.Error(err)
  734. }
  735. if response == nil {
  736. t.Error("OffsetFetch request got no response!")
  737. }
  738. }},
  739. {V0_10_0_0,
  740. "OffsetCommitRequest",
  741. []byte{0x00, 0x00, 0x00, 0x00},
  742. func(t *testing.T, broker *Broker) {
  743. request := OffsetCommitRequest{}
  744. response, err := broker.CommitOffset(&request)
  745. if err != nil {
  746. t.Error(err)
  747. }
  748. if response == nil {
  749. t.Error("OffsetCommit request got no response!")
  750. }
  751. }},
  752. {V0_10_0_0,
  753. "OffsetRequest",
  754. []byte{0x00, 0x00, 0x00, 0x00},
  755. func(t *testing.T, broker *Broker) {
  756. request := OffsetRequest{}
  757. response, err := broker.GetAvailableOffsets(&request)
  758. if err != nil {
  759. t.Error(err)
  760. }
  761. if response == nil {
  762. t.Error("Offset request got no response!")
  763. }
  764. }},
  765. {V0_10_0_0,
  766. "JoinGroupRequest",
  767. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  768. func(t *testing.T, broker *Broker) {
  769. request := JoinGroupRequest{}
  770. response, err := broker.JoinGroup(&request)
  771. if err != nil {
  772. t.Error(err)
  773. }
  774. if response == nil {
  775. t.Error("JoinGroup request got no response!")
  776. }
  777. }},
  778. {V0_10_0_0,
  779. "SyncGroupRequest",
  780. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  781. func(t *testing.T, broker *Broker) {
  782. request := SyncGroupRequest{}
  783. response, err := broker.SyncGroup(&request)
  784. if err != nil {
  785. t.Error(err)
  786. }
  787. if response == nil {
  788. t.Error("SyncGroup request got no response!")
  789. }
  790. }},
  791. {V0_10_0_0,
  792. "LeaveGroupRequest",
  793. []byte{0x00, 0x00},
  794. func(t *testing.T, broker *Broker) {
  795. request := LeaveGroupRequest{}
  796. response, err := broker.LeaveGroup(&request)
  797. if err != nil {
  798. t.Error(err)
  799. }
  800. if response == nil {
  801. t.Error("LeaveGroup request got no response!")
  802. }
  803. }},
  804. {V0_10_0_0,
  805. "HeartbeatRequest",
  806. []byte{0x00, 0x00},
  807. func(t *testing.T, broker *Broker) {
  808. request := HeartbeatRequest{}
  809. response, err := broker.Heartbeat(&request)
  810. if err != nil {
  811. t.Error(err)
  812. }
  813. if response == nil {
  814. t.Error("Heartbeat request got no response!")
  815. }
  816. }},
  817. {V0_10_0_0,
  818. "ListGroupsRequest",
  819. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  820. func(t *testing.T, broker *Broker) {
  821. request := ListGroupsRequest{}
  822. response, err := broker.ListGroups(&request)
  823. if err != nil {
  824. t.Error(err)
  825. }
  826. if response == nil {
  827. t.Error("ListGroups request got no response!")
  828. }
  829. }},
  830. {V0_10_0_0,
  831. "DescribeGroupsRequest",
  832. []byte{0x00, 0x00, 0x00, 0x00},
  833. func(t *testing.T, broker *Broker) {
  834. request := DescribeGroupsRequest{}
  835. response, err := broker.DescribeGroups(&request)
  836. if err != nil {
  837. t.Error(err)
  838. }
  839. if response == nil {
  840. t.Error("DescribeGroups request got no response!")
  841. }
  842. }},
  843. {V0_10_0_0,
  844. "ApiVersionsRequest",
  845. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  846. func(t *testing.T, broker *Broker) {
  847. request := ApiVersionsRequest{}
  848. response, err := broker.ApiVersions(&request)
  849. if err != nil {
  850. t.Error(err)
  851. }
  852. if response == nil {
  853. t.Error("ApiVersions request got no response!")
  854. }
  855. }},
  856. {V1_1_0_0,
  857. "DeleteGroupsRequest",
  858. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  859. func(t *testing.T, broker *Broker) {
  860. request := DeleteGroupsRequest{}
  861. response, err := broker.DeleteGroups(&request)
  862. if err != nil {
  863. t.Error(err)
  864. }
  865. if response == nil {
  866. t.Error("DeleteGroups request got no response!")
  867. }
  868. }},
  869. }
  870. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  871. metricValidators := newMetricValidators()
  872. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  873. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  874. // Check that the number of bytes sent corresponds to what the mock broker received
  875. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  876. if mockBrokerBytesWritten == 0 {
  877. // This a ProduceRequest with NoResponse
  878. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  879. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  880. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  881. } else {
  882. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  883. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  884. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  885. }
  886. // Check that the number of bytes received corresponds to what the mock broker sent
  887. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  888. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  889. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  890. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  891. // Check that there is no more requests in flight
  892. metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
  893. // Run the validators
  894. metricValidators.run(t, broker.conf.MetricRegistry)
  895. }