broker_test.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015
  1. package sarama
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "reflect"
  8. "testing"
  9. "time"
  10. "github.com/rcrowley/go-metrics"
  11. "gopkg.in/jcmturner/gokrb5.v7/krberror"
  12. )
  13. func ExampleBroker() {
  14. broker := NewBroker("localhost:9092")
  15. err := broker.Open(nil)
  16. if err != nil {
  17. panic(err)
  18. }
  19. request := MetadataRequest{Topics: []string{"myTopic"}}
  20. response, err := broker.GetMetadata(&request)
  21. if err != nil {
  22. _ = broker.Close()
  23. panic(err)
  24. }
  25. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  26. if err = broker.Close(); err != nil {
  27. panic(err)
  28. }
  29. }
  30. type mockEncoder struct {
  31. bytes []byte
  32. }
  33. func (m mockEncoder) encode(pe packetEncoder) error {
  34. return pe.putRawBytes(m.bytes)
  35. }
  36. func (m mockEncoder) headerVersion() int16 {
  37. return 0
  38. }
  39. type brokerMetrics struct {
  40. bytesRead int
  41. bytesWritten int
  42. }
  43. func TestBrokerAccessors(t *testing.T) {
  44. broker := NewBroker("abc:123")
  45. if broker.ID() != -1 {
  46. t.Error("New broker didn't have an ID of -1.")
  47. }
  48. if broker.Addr() != "abc:123" {
  49. t.Error("New broker didn't have the correct address")
  50. }
  51. if broker.Rack() != "" {
  52. t.Error("New broker didn't have an unknown rack.")
  53. }
  54. broker.id = 34
  55. if broker.ID() != 34 {
  56. t.Error("Manually setting broker ID did not take effect.")
  57. }
  58. rack := "dc1"
  59. broker.rack = &rack
  60. if broker.Rack() != rack {
  61. t.Error("Manually setting broker rack did not take effect.")
  62. }
  63. }
  64. func TestSimpleBrokerCommunication(t *testing.T) {
  65. for _, tt := range brokerTestTable {
  66. t.Run(tt.name, func(t *testing.T) {
  67. Logger.Printf("Testing broker communication for %s", tt.name)
  68. mb := NewMockBroker(t, 0)
  69. mb.Returns(&mockEncoder{tt.response})
  70. pendingNotify := make(chan brokerMetrics)
  71. // Register a callback to be notified about successful requests
  72. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  73. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  74. })
  75. broker := NewBroker(mb.Addr())
  76. // Set the broker id in order to validate local broker metrics
  77. broker.id = 0
  78. conf := NewConfig()
  79. conf.Version = tt.version
  80. err := broker.Open(conf)
  81. if err != nil {
  82. t.Fatal(err)
  83. }
  84. tt.runner(t, broker)
  85. // Wait up to 500 ms for the remote broker to process the request and
  86. // notify us about the metrics
  87. timeout := 500 * time.Millisecond
  88. select {
  89. case mockBrokerMetrics := <-pendingNotify:
  90. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  91. case <-time.After(timeout):
  92. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  93. }
  94. mb.Close()
  95. err = broker.Close()
  96. if err != nil {
  97. t.Error(err)
  98. }
  99. })
  100. }
  101. }
  102. var ErrTokenFailure = errors.New("Failure generating token")
  103. type TokenProvider struct {
  104. accessToken *AccessToken
  105. err error
  106. }
  107. func (t *TokenProvider) Token() (*AccessToken, error) {
  108. return t.accessToken, t.err
  109. }
  110. func newTokenProvider(token *AccessToken, err error) *TokenProvider {
  111. return &TokenProvider{
  112. accessToken: token,
  113. err: err,
  114. }
  115. }
  116. func TestSASLOAuthBearer(t *testing.T) {
  117. testTable := []struct {
  118. name string
  119. authidentity string
  120. mockSASLHandshakeResponse MockResponse // Mock SaslHandshakeRequest response from broker
  121. mockSASLAuthResponse MockResponse // Mock SaslAuthenticateRequest response from broker
  122. expectClientErr bool // Expect an internal client-side error
  123. expectedBrokerError KError // Expected Kafka error returned by client
  124. tokProvider *TokenProvider
  125. }{
  126. {
  127. name: "SASL/OAUTHBEARER OK server response",
  128. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  129. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  130. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  131. expectClientErr: false,
  132. expectedBrokerError: ErrNoError,
  133. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  134. },
  135. {
  136. name: "SASL/OAUTHBEARER authentication failure response",
  137. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  138. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  139. mockSASLAuthResponse: NewMockSequence(
  140. // First, the broker response with a challenge
  141. NewMockSaslAuthenticateResponse(t).
  142. SetAuthBytes([]byte(`{"status":"invalid_request1"}`)),
  143. // Next, the client terminates the token exchange. Finally, the
  144. // broker responds with an error message.
  145. NewMockSaslAuthenticateResponse(t).
  146. SetAuthBytes([]byte(`{"status":"invalid_request2"}`)).
  147. SetError(ErrSASLAuthenticationFailed),
  148. ),
  149. expectClientErr: true,
  150. expectedBrokerError: ErrSASLAuthenticationFailed,
  151. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  152. },
  153. {
  154. name: "SASL/OAUTHBEARER handshake failure response",
  155. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  156. SetEnabledMechanisms([]string{SASLTypeOAuth}).
  157. SetError(ErrSASLAuthenticationFailed),
  158. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  159. expectClientErr: true,
  160. expectedBrokerError: ErrSASLAuthenticationFailed,
  161. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, nil),
  162. },
  163. {
  164. name: "SASL/OAUTHBEARER token generation error",
  165. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  166. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  167. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  168. expectClientErr: true,
  169. expectedBrokerError: ErrNoError,
  170. tokProvider: newTokenProvider(&AccessToken{Token: "access-token-123"}, ErrTokenFailure),
  171. },
  172. {
  173. name: "SASL/OAUTHBEARER invalid extension",
  174. mockSASLHandshakeResponse: NewMockSaslHandshakeResponse(t).
  175. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  176. mockSASLAuthResponse: NewMockSaslAuthenticateResponse(t),
  177. expectClientErr: true,
  178. expectedBrokerError: ErrNoError,
  179. tokProvider: newTokenProvider(&AccessToken{
  180. Token: "access-token-123",
  181. Extensions: map[string]string{"auth": "auth-value"},
  182. }, nil),
  183. },
  184. }
  185. for i, test := range testTable {
  186. t.Run(test.name, func(t *testing.T) {
  187. // mockBroker mocks underlying network logic and broker responses
  188. mockBroker := NewMockBroker(t, 0)
  189. mockBroker.SetHandlerByMap(map[string]MockResponse{
  190. "SaslAuthenticateRequest": test.mockSASLAuthResponse,
  191. "SaslHandshakeRequest": test.mockSASLHandshakeResponse,
  192. })
  193. // broker executes SASL requests against mockBroker
  194. broker := NewBroker(mockBroker.Addr())
  195. broker.requestRate = metrics.NilMeter{}
  196. broker.outgoingByteRate = metrics.NilMeter{}
  197. broker.incomingByteRate = metrics.NilMeter{}
  198. broker.requestSize = metrics.NilHistogram{}
  199. broker.responseSize = metrics.NilHistogram{}
  200. broker.responseRate = metrics.NilMeter{}
  201. broker.requestLatency = metrics.NilHistogram{}
  202. broker.requestsInFlight = metrics.NilCounter{}
  203. conf := NewConfig()
  204. conf.Net.SASL.Mechanism = SASLTypeOAuth
  205. conf.Net.SASL.TokenProvider = test.tokProvider
  206. broker.conf = conf
  207. dialer := net.Dialer{
  208. Timeout: conf.Net.DialTimeout,
  209. KeepAlive: conf.Net.KeepAlive,
  210. LocalAddr: conf.Net.LocalAddr,
  211. }
  212. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  213. if err != nil {
  214. t.Fatal(err)
  215. }
  216. broker.conn = conn
  217. err = broker.authenticateViaSASL()
  218. if test.expectedBrokerError != ErrNoError {
  219. if test.expectedBrokerError != err {
  220. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.expectedBrokerError, err)
  221. }
  222. } else if test.expectClientErr && err == nil {
  223. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  224. } else if !test.expectClientErr && err != nil {
  225. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  226. }
  227. mockBroker.Close()
  228. })
  229. }
  230. }
  231. // A mock scram client.
  232. type MockSCRAMClient struct {
  233. done bool
  234. }
  235. func (m *MockSCRAMClient) Begin(_, _, _ string) (err error) {
  236. return nil
  237. }
  238. func (m *MockSCRAMClient) Step(challenge string) (response string, err error) {
  239. if challenge == "" {
  240. return "ping", nil
  241. }
  242. if challenge == "pong" {
  243. m.done = true
  244. return "", nil
  245. }
  246. return "", errors.New("failed to authenticate :(")
  247. }
  248. func (m *MockSCRAMClient) Done() bool {
  249. return m.done
  250. }
  251. var _ SCRAMClient = &MockSCRAMClient{}
  252. func TestSASLSCRAMSHAXXX(t *testing.T) {
  253. testTable := []struct {
  254. name string
  255. mockHandshakeErr KError
  256. mockSASLAuthErr KError
  257. expectClientErr bool
  258. scramClient *MockSCRAMClient
  259. scramChallengeResp string
  260. }{
  261. {
  262. name: "SASL/SCRAMSHAXXX successful authentication",
  263. mockHandshakeErr: ErrNoError,
  264. scramClient: &MockSCRAMClient{},
  265. scramChallengeResp: "pong",
  266. },
  267. {
  268. name: "SASL/SCRAMSHAXXX SCRAM client step error client",
  269. mockHandshakeErr: ErrNoError,
  270. mockSASLAuthErr: ErrNoError,
  271. scramClient: &MockSCRAMClient{},
  272. scramChallengeResp: "gong",
  273. expectClientErr: true,
  274. },
  275. {
  276. name: "SASL/SCRAMSHAXXX server authentication error",
  277. mockHandshakeErr: ErrNoError,
  278. mockSASLAuthErr: ErrSASLAuthenticationFailed,
  279. scramClient: &MockSCRAMClient{},
  280. scramChallengeResp: "pong",
  281. },
  282. {
  283. name: "SASL/SCRAMSHAXXX unsupported SCRAM mechanism",
  284. mockHandshakeErr: ErrUnsupportedSASLMechanism,
  285. mockSASLAuthErr: ErrNoError,
  286. scramClient: &MockSCRAMClient{},
  287. scramChallengeResp: "pong",
  288. },
  289. }
  290. for i, test := range testTable {
  291. t.Run(test.name, func(t *testing.T) {
  292. // mockBroker mocks underlying network logic and broker responses
  293. mockBroker := NewMockBroker(t, 0)
  294. broker := NewBroker(mockBroker.Addr())
  295. // broker executes SASL requests against mockBroker
  296. broker.requestRate = metrics.NilMeter{}
  297. broker.outgoingByteRate = metrics.NilMeter{}
  298. broker.incomingByteRate = metrics.NilMeter{}
  299. broker.requestSize = metrics.NilHistogram{}
  300. broker.responseSize = metrics.NilHistogram{}
  301. broker.responseRate = metrics.NilMeter{}
  302. broker.requestLatency = metrics.NilHistogram{}
  303. broker.requestsInFlight = metrics.NilCounter{}
  304. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
  305. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
  306. if test.mockSASLAuthErr != ErrNoError {
  307. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockSASLAuthErr)
  308. }
  309. if test.mockHandshakeErr != ErrNoError {
  310. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  311. }
  312. mockBroker.SetHandlerByMap(map[string]MockResponse{
  313. "SaslAuthenticateRequest": mockSASLAuthResponse,
  314. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  315. })
  316. conf := NewConfig()
  317. conf.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  318. conf.Net.SASL.SCRAMClientGeneratorFunc = func() SCRAMClient { return test.scramClient }
  319. broker.conf = conf
  320. dialer := net.Dialer{
  321. Timeout: conf.Net.DialTimeout,
  322. KeepAlive: conf.Net.KeepAlive,
  323. LocalAddr: conf.Net.LocalAddr,
  324. }
  325. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  326. if err != nil {
  327. t.Fatal(err)
  328. }
  329. broker.conn = conn
  330. err = broker.authenticateViaSASL()
  331. if test.mockSASLAuthErr != ErrNoError {
  332. if test.mockSASLAuthErr != err {
  333. t.Errorf("[%d]:[%s] Expected %s SASL authentication error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  334. }
  335. } else if test.mockHandshakeErr != ErrNoError {
  336. if test.mockHandshakeErr != err {
  337. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  338. }
  339. } else if test.expectClientErr && err == nil {
  340. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  341. } else if !test.expectClientErr && err != nil {
  342. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  343. }
  344. mockBroker.Close()
  345. })
  346. }
  347. }
  348. func TestSASLPlainAuth(t *testing.T) {
  349. testTable := []struct {
  350. name string
  351. authidentity string
  352. mockAuthErr KError // Mock and expect error returned from SaslAuthenticateRequest
  353. mockHandshakeErr KError // Mock and expect error returned from SaslHandshakeRequest
  354. expectClientErr bool // Expect an internal client-side error
  355. }{
  356. {
  357. name: "SASL Plain OK server response",
  358. mockAuthErr: ErrNoError,
  359. mockHandshakeErr: ErrNoError,
  360. },
  361. {
  362. name: "SASL Plain OK server response with authidentity",
  363. authidentity: "authid",
  364. mockAuthErr: ErrNoError,
  365. mockHandshakeErr: ErrNoError,
  366. },
  367. {
  368. name: "SASL Plain authentication failure response",
  369. mockAuthErr: ErrSASLAuthenticationFailed,
  370. mockHandshakeErr: ErrNoError,
  371. },
  372. {
  373. name: "SASL Plain handshake failure response",
  374. mockAuthErr: ErrNoError,
  375. mockHandshakeErr: ErrSASLAuthenticationFailed,
  376. },
  377. }
  378. for i, test := range testTable {
  379. t.Run(test.name, func(t *testing.T) {
  380. // mockBroker mocks underlying network logic and broker responses
  381. mockBroker := NewMockBroker(t, 0)
  382. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  383. SetAuthBytes([]byte(`response_payload`))
  384. if test.mockAuthErr != ErrNoError {
  385. mockSASLAuthResponse = mockSASLAuthResponse.SetError(test.mockAuthErr)
  386. }
  387. mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).
  388. SetEnabledMechanisms([]string{SASLTypePlaintext})
  389. if test.mockHandshakeErr != ErrNoError {
  390. mockSASLHandshakeResponse = mockSASLHandshakeResponse.SetError(test.mockHandshakeErr)
  391. }
  392. mockBroker.SetHandlerByMap(map[string]MockResponse{
  393. "SaslAuthenticateRequest": mockSASLAuthResponse,
  394. "SaslHandshakeRequest": mockSASLHandshakeResponse,
  395. })
  396. // broker executes SASL requests against mockBroker
  397. broker := NewBroker(mockBroker.Addr())
  398. broker.requestRate = metrics.NilMeter{}
  399. broker.outgoingByteRate = metrics.NilMeter{}
  400. broker.incomingByteRate = metrics.NilMeter{}
  401. broker.requestSize = metrics.NilHistogram{}
  402. broker.responseSize = metrics.NilHistogram{}
  403. broker.responseRate = metrics.NilMeter{}
  404. broker.requestLatency = metrics.NilHistogram{}
  405. broker.requestsInFlight = metrics.NilCounter{}
  406. conf := NewConfig()
  407. conf.Net.SASL.Mechanism = SASLTypePlaintext
  408. conf.Net.SASL.AuthIdentity = test.authidentity
  409. conf.Net.SASL.User = "token"
  410. conf.Net.SASL.Password = "password"
  411. conf.Net.SASL.Version = SASLHandshakeV1
  412. broker.conf = conf
  413. broker.conf.Version = V1_0_0_0
  414. dialer := net.Dialer{
  415. Timeout: conf.Net.DialTimeout,
  416. KeepAlive: conf.Net.KeepAlive,
  417. LocalAddr: conf.Net.LocalAddr,
  418. }
  419. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  420. if err != nil {
  421. t.Fatal(err)
  422. }
  423. broker.conn = conn
  424. err = broker.authenticateViaSASL()
  425. if err == nil {
  426. for _, rr := range mockBroker.History() {
  427. switch r := rr.Request.(type) {
  428. case *SaslAuthenticateRequest:
  429. x := bytes.SplitN(r.SaslAuthBytes, []byte("\x00"), 3)
  430. if string(x[0]) != conf.Net.SASL.AuthIdentity {
  431. t.Errorf("[%d]:[%s] expected %s auth identity, got %s\n", i, test.name, conf.Net.SASL.AuthIdentity, x[0])
  432. }
  433. if string(x[1]) != conf.Net.SASL.User {
  434. t.Errorf("[%d]:[%s] expected %s user, got %s\n", i, test.name, conf.Net.SASL.User, x[1])
  435. }
  436. if string(x[2]) != conf.Net.SASL.Password {
  437. t.Errorf("[%d]:[%s] expected %s password, got %s\n", i, test.name, conf.Net.SASL.Password, x[2])
  438. }
  439. }
  440. }
  441. }
  442. if test.mockAuthErr != ErrNoError {
  443. if test.mockAuthErr != err {
  444. t.Errorf("[%d]:[%s] Expected %s auth error, got %s\n", i, test.name, test.mockAuthErr, err)
  445. }
  446. } else if test.mockHandshakeErr != ErrNoError {
  447. if test.mockHandshakeErr != err {
  448. t.Errorf("[%d]:[%s] Expected %s handshake error, got %s\n", i, test.name, test.mockHandshakeErr, err)
  449. }
  450. } else if test.expectClientErr && err == nil {
  451. t.Errorf("[%d]:[%s] Expected a client error and got none\n", i, test.name)
  452. } else if !test.expectClientErr && err != nil {
  453. t.Errorf("[%d]:[%s] Unexpected error, got %s\n", i, test.name, err)
  454. }
  455. mockBroker.Close()
  456. })
  457. }
  458. }
  459. // TestSASLReadTimeout ensures that the broker connection won't block forever
  460. // if the remote end never responds after the handshake
  461. func TestSASLReadTimeout(t *testing.T) {
  462. mockBroker := NewMockBroker(t, 0)
  463. defer mockBroker.Close()
  464. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
  465. SetAuthBytes([]byte(`response_payload`))
  466. mockBroker.SetHandlerByMap(map[string]MockResponse{
  467. "SaslAuthenticateRequest": mockSASLAuthResponse,
  468. })
  469. broker := NewBroker(mockBroker.Addr())
  470. {
  471. broker.requestRate = metrics.NilMeter{}
  472. broker.outgoingByteRate = metrics.NilMeter{}
  473. broker.incomingByteRate = metrics.NilMeter{}
  474. broker.requestSize = metrics.NilHistogram{}
  475. broker.responseSize = metrics.NilHistogram{}
  476. broker.responseRate = metrics.NilMeter{}
  477. broker.requestLatency = metrics.NilHistogram{}
  478. broker.requestsInFlight = metrics.NilCounter{}
  479. }
  480. conf := NewConfig()
  481. {
  482. conf.Net.ReadTimeout = time.Millisecond
  483. conf.Net.SASL.Mechanism = SASLTypePlaintext
  484. conf.Net.SASL.User = "token"
  485. conf.Net.SASL.Password = "password"
  486. conf.Net.SASL.Version = SASLHandshakeV1
  487. }
  488. broker.conf = conf
  489. broker.conf.Version = V1_0_0_0
  490. dialer := net.Dialer{}
  491. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  492. if err != nil {
  493. t.Fatal(err)
  494. }
  495. broker.conn = conn
  496. err = broker.authenticateViaSASL()
  497. if err == nil {
  498. t.Errorf("should never happen - expected read timeout")
  499. }
  500. }
  501. func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
  502. testTable := []struct {
  503. name string
  504. error error
  505. mockKerberosClient bool
  506. errorStage string
  507. badResponse bool
  508. badKeyChecksum bool
  509. }{
  510. {
  511. name: "Kerberos authentication success",
  512. error: nil,
  513. mockKerberosClient: true,
  514. },
  515. {
  516. name: "Kerberos login fails",
  517. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  518. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  519. "cation information was invalid - PREAUTH_FAILED"),
  520. mockKerberosClient: true,
  521. errorStage: "login",
  522. },
  523. {
  524. name: "Kerberos service ticket fails",
  525. error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
  526. "kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
  527. "cation information was invalid - PREAUTH_FAILED"),
  528. mockKerberosClient: true,
  529. errorStage: "service_ticket",
  530. },
  531. {
  532. name: "Kerberos client creation fails",
  533. error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
  534. },
  535. {
  536. name: "Bad server response, unmarshall key error",
  537. error: errors.New("bytes shorter than header length"),
  538. badResponse: true,
  539. mockKerberosClient: true,
  540. },
  541. {
  542. name: "Bad token checksum",
  543. error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
  544. badResponse: false,
  545. badKeyChecksum: true,
  546. mockKerberosClient: true,
  547. },
  548. }
  549. for i, test := range testTable {
  550. t.Run(test.name, func(t *testing.T) {
  551. mockBroker := NewMockBroker(t, 0)
  552. // broker executes SASL requests against mockBroker
  553. mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
  554. return nil
  555. })
  556. broker := NewBroker(mockBroker.Addr())
  557. broker.requestRate = metrics.NilMeter{}
  558. broker.outgoingByteRate = metrics.NilMeter{}
  559. broker.incomingByteRate = metrics.NilMeter{}
  560. broker.requestSize = metrics.NilHistogram{}
  561. broker.responseSize = metrics.NilHistogram{}
  562. broker.responseRate = metrics.NilMeter{}
  563. broker.requestLatency = metrics.NilHistogram{}
  564. broker.requestsInFlight = metrics.NilCounter{}
  565. conf := NewConfig()
  566. conf.Net.SASL.Mechanism = SASLTypeGSSAPI
  567. conf.Net.SASL.GSSAPI.ServiceName = "kafka"
  568. conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
  569. conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
  570. conf.Net.SASL.GSSAPI.Username = "kafka"
  571. conf.Net.SASL.GSSAPI.Password = "kafka"
  572. conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
  573. conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  574. broker.conf = conf
  575. broker.conf.Version = V1_0_0_0
  576. dialer := net.Dialer{
  577. Timeout: conf.Net.DialTimeout,
  578. KeepAlive: conf.Net.KeepAlive,
  579. LocalAddr: conf.Net.LocalAddr,
  580. }
  581. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  582. if err != nil {
  583. t.Fatal(err)
  584. }
  585. gssapiHandler := KafkaGSSAPIHandler{
  586. client: &MockKerberosClient{},
  587. badResponse: test.badResponse,
  588. badKeyChecksum: test.badKeyChecksum,
  589. }
  590. mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
  591. broker.conn = conn
  592. if test.mockKerberosClient {
  593. broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
  594. return &MockKerberosClient{
  595. mockError: test.error,
  596. errorStage: test.errorStage,
  597. }, nil
  598. }
  599. } else {
  600. broker.kerberosAuthenticator.NewKerberosClientFunc = nil
  601. }
  602. err = broker.authenticateViaSASL()
  603. if err != nil && test.error != nil {
  604. if test.error.Error() != err.Error() {
  605. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  606. }
  607. } else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
  608. t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
  609. }
  610. mockBroker.Close()
  611. })
  612. }
  613. }
  614. func TestBuildClientFirstMessage(t *testing.T) {
  615. testTable := []struct {
  616. name string
  617. token *AccessToken
  618. expected []byte
  619. expectError bool
  620. }{
  621. {
  622. name: "Build SASL client initial response with two extensions",
  623. token: &AccessToken{
  624. Token: "the-token",
  625. Extensions: map[string]string{
  626. "x": "1",
  627. "y": "2",
  628. },
  629. },
  630. expected: []byte("n,,\x01auth=Bearer the-token\x01x=1\x01y=2\x01\x01"),
  631. },
  632. {
  633. name: "Build SASL client initial response with no extensions",
  634. token: &AccessToken{Token: "the-token"},
  635. expected: []byte("n,,\x01auth=Bearer the-token\x01\x01"),
  636. },
  637. {
  638. name: "Build SASL client initial response using reserved extension",
  639. token: &AccessToken{
  640. Token: "the-token",
  641. Extensions: map[string]string{
  642. "auth": "auth-value",
  643. },
  644. },
  645. expected: []byte(""),
  646. expectError: true,
  647. },
  648. }
  649. for i, test := range testTable {
  650. t.Run(test.name, func(t *testing.T) {
  651. actual, err := buildClientFirstMessage(test.token)
  652. if !reflect.DeepEqual(test.expected, actual) {
  653. t.Errorf("Expected %s, got %s\n", test.expected, actual)
  654. }
  655. if test.expectError && err == nil {
  656. t.Errorf("[%d]:[%s] Expected an error but did not get one", i, test.name)
  657. }
  658. if !test.expectError && err != nil {
  659. t.Errorf("[%d]:[%s] Expected no error but got %s\n", i, test.name, err)
  660. }
  661. })
  662. }
  663. }
  664. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  665. var brokerTestTable = []struct {
  666. version KafkaVersion
  667. name string
  668. response []byte
  669. runner func(*testing.T, *Broker)
  670. }{
  671. {V0_10_0_0,
  672. "MetadataRequest",
  673. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  674. func(t *testing.T, broker *Broker) {
  675. request := MetadataRequest{}
  676. response, err := broker.GetMetadata(&request)
  677. if err != nil {
  678. t.Error(err)
  679. }
  680. if response == nil {
  681. t.Error("Metadata request got no response!")
  682. }
  683. }},
  684. {V0_10_0_0,
  685. "ConsumerMetadataRequest",
  686. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  687. func(t *testing.T, broker *Broker) {
  688. request := ConsumerMetadataRequest{}
  689. response, err := broker.GetConsumerMetadata(&request)
  690. if err != nil {
  691. t.Error(err)
  692. }
  693. if response == nil {
  694. t.Error("Consumer Metadata request got no response!")
  695. }
  696. }},
  697. {V0_10_0_0,
  698. "ProduceRequest (NoResponse)",
  699. []byte{},
  700. func(t *testing.T, broker *Broker) {
  701. request := ProduceRequest{}
  702. request.RequiredAcks = NoResponse
  703. response, err := broker.Produce(&request)
  704. if err != nil {
  705. t.Error(err)
  706. }
  707. if response != nil {
  708. t.Error("Produce request with NoResponse got a response!")
  709. }
  710. }},
  711. {V0_10_0_0,
  712. "ProduceRequest (WaitForLocal)",
  713. []byte{0x00, 0x00, 0x00, 0x00},
  714. func(t *testing.T, broker *Broker) {
  715. request := ProduceRequest{}
  716. request.RequiredAcks = WaitForLocal
  717. response, err := broker.Produce(&request)
  718. if err != nil {
  719. t.Error(err)
  720. }
  721. if response == nil {
  722. t.Error("Produce request without NoResponse got no response!")
  723. }
  724. }},
  725. {V0_10_0_0,
  726. "FetchRequest",
  727. []byte{0x00, 0x00, 0x00, 0x00},
  728. func(t *testing.T, broker *Broker) {
  729. request := FetchRequest{}
  730. response, err := broker.Fetch(&request)
  731. if err != nil {
  732. t.Error(err)
  733. }
  734. if response == nil {
  735. t.Error("Fetch request got no response!")
  736. }
  737. }},
  738. {V0_10_0_0,
  739. "OffsetFetchRequest",
  740. []byte{0x00, 0x00, 0x00, 0x00},
  741. func(t *testing.T, broker *Broker) {
  742. request := OffsetFetchRequest{}
  743. response, err := broker.FetchOffset(&request)
  744. if err != nil {
  745. t.Error(err)
  746. }
  747. if response == nil {
  748. t.Error("OffsetFetch request got no response!")
  749. }
  750. }},
  751. {V0_10_0_0,
  752. "OffsetCommitRequest",
  753. []byte{0x00, 0x00, 0x00, 0x00},
  754. func(t *testing.T, broker *Broker) {
  755. request := OffsetCommitRequest{}
  756. response, err := broker.CommitOffset(&request)
  757. if err != nil {
  758. t.Error(err)
  759. }
  760. if response == nil {
  761. t.Error("OffsetCommit request got no response!")
  762. }
  763. }},
  764. {V0_10_0_0,
  765. "OffsetRequest",
  766. []byte{0x00, 0x00, 0x00, 0x00},
  767. func(t *testing.T, broker *Broker) {
  768. request := OffsetRequest{}
  769. response, err := broker.GetAvailableOffsets(&request)
  770. if err != nil {
  771. t.Error(err)
  772. }
  773. if response == nil {
  774. t.Error("Offset request got no response!")
  775. }
  776. }},
  777. {V0_10_0_0,
  778. "JoinGroupRequest",
  779. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  780. func(t *testing.T, broker *Broker) {
  781. request := JoinGroupRequest{}
  782. response, err := broker.JoinGroup(&request)
  783. if err != nil {
  784. t.Error(err)
  785. }
  786. if response == nil {
  787. t.Error("JoinGroup request got no response!")
  788. }
  789. }},
  790. {V0_10_0_0,
  791. "SyncGroupRequest",
  792. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  793. func(t *testing.T, broker *Broker) {
  794. request := SyncGroupRequest{}
  795. response, err := broker.SyncGroup(&request)
  796. if err != nil {
  797. t.Error(err)
  798. }
  799. if response == nil {
  800. t.Error("SyncGroup request got no response!")
  801. }
  802. }},
  803. {V0_10_0_0,
  804. "LeaveGroupRequest",
  805. []byte{0x00, 0x00},
  806. func(t *testing.T, broker *Broker) {
  807. request := LeaveGroupRequest{}
  808. response, err := broker.LeaveGroup(&request)
  809. if err != nil {
  810. t.Error(err)
  811. }
  812. if response == nil {
  813. t.Error("LeaveGroup request got no response!")
  814. }
  815. }},
  816. {V0_10_0_0,
  817. "HeartbeatRequest",
  818. []byte{0x00, 0x00},
  819. func(t *testing.T, broker *Broker) {
  820. request := HeartbeatRequest{}
  821. response, err := broker.Heartbeat(&request)
  822. if err != nil {
  823. t.Error(err)
  824. }
  825. if response == nil {
  826. t.Error("Heartbeat request got no response!")
  827. }
  828. }},
  829. {V0_10_0_0,
  830. "ListGroupsRequest",
  831. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  832. func(t *testing.T, broker *Broker) {
  833. request := ListGroupsRequest{}
  834. response, err := broker.ListGroups(&request)
  835. if err != nil {
  836. t.Error(err)
  837. }
  838. if response == nil {
  839. t.Error("ListGroups request got no response!")
  840. }
  841. }},
  842. {V0_10_0_0,
  843. "DescribeGroupsRequest",
  844. []byte{0x00, 0x00, 0x00, 0x00},
  845. func(t *testing.T, broker *Broker) {
  846. request := DescribeGroupsRequest{}
  847. response, err := broker.DescribeGroups(&request)
  848. if err != nil {
  849. t.Error(err)
  850. }
  851. if response == nil {
  852. t.Error("DescribeGroups request got no response!")
  853. }
  854. }},
  855. {V0_10_0_0,
  856. "ApiVersionsRequest",
  857. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  858. func(t *testing.T, broker *Broker) {
  859. request := ApiVersionsRequest{}
  860. response, err := broker.ApiVersions(&request)
  861. if err != nil {
  862. t.Error(err)
  863. }
  864. if response == nil {
  865. t.Error("ApiVersions request got no response!")
  866. }
  867. }},
  868. {V1_1_0_0,
  869. "DeleteGroupsRequest",
  870. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  871. func(t *testing.T, broker *Broker) {
  872. request := DeleteGroupsRequest{}
  873. response, err := broker.DeleteGroups(&request)
  874. if err != nil {
  875. t.Error(err)
  876. }
  877. if response == nil {
  878. t.Error("DeleteGroups request got no response!")
  879. }
  880. }},
  881. }
  882. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  883. metricValidators := newMetricValidators()
  884. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  885. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  886. // Check that the number of bytes sent corresponds to what the mock broker received
  887. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  888. if mockBrokerBytesWritten == 0 {
  889. // This a ProduceRequest with NoResponse
  890. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  891. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  892. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  893. } else {
  894. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  895. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  896. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  897. }
  898. // Check that the number of bytes received corresponds to what the mock broker sent
  899. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  900. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  901. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  902. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  903. // Check that there is no more requests in flight
  904. metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))
  905. // Run the validators
  906. metricValidators.run(t, broker.conf.MetricRegistry)
  907. }