broker.go 32 KB


  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/rcrowley/go-metrics"
  15. )
  16. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  17. type Broker struct {
  18. id int32
  19. addr string
  20. rack *string
  21. conf *Config
  22. correlationID int32
  23. conn net.Conn
  24. connErr error
  25. lock sync.Mutex
  26. opened int32
  27. responses chan responsePromise
  28. done chan bool
  29. incomingByteRate metrics.Meter
  30. requestRate metrics.Meter
  31. requestSize metrics.Histogram
  32. requestLatency metrics.Histogram
  33. outgoingByteRate metrics.Meter
  34. responseRate metrics.Meter
  35. responseSize metrics.Histogram
  36. brokerIncomingByteRate metrics.Meter
  37. brokerRequestRate metrics.Meter
  38. brokerRequestSize metrics.Histogram
  39. brokerRequestLatency metrics.Histogram
  40. brokerOutgoingByteRate metrics.Meter
  41. brokerResponseRate metrics.Meter
  42. brokerResponseSize metrics.Histogram
  43. }
  44. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  45. type SASLMechanism string
  46. const (
  47. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  48. SASLTypeOAuth = "OAUTHBEARER"
  49. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  50. SASLTypePlaintext = "PLAIN"
  51. // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
  52. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
  53. // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
  54. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
  55. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  56. // server negotiate SASL auth using opaque packets.
  57. SASLHandshakeV0 = int16(0)
  58. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  59. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  60. SASLHandshakeV1 = int16(1)
  61. // SASLExtKeyAuth is the reserved extension key name sent as part of the
  62. // SASL/OAUTHBEARER intial client response
  63. SASLExtKeyAuth = "auth"
  64. )
  65. // AccessToken contains an access token used to authenticate a
  66. // SASL/OAUTHBEARER client along with associated metadata.
  67. type AccessToken struct {
  68. // Token is the access token payload.
  69. Token string
  70. // Extensions is a optional map of arbitrary key-value pairs that can be
  71. // sent with the SASL/OAUTHBEARER initial client response. These values are
  72. // ignored by the SASL server if they are unexpected. This feature is only
  73. // supported by Kafka >= 2.1.0.
  74. Extensions map[string]string
  75. }
  76. // AccessTokenProvider is the interface that encapsulates how implementors
  77. // can generate access tokens for Kafka broker authentication.
  78. type AccessTokenProvider interface {
  79. // Token returns an access token. The implementation should ensure token
  80. // reuse so that multiple calls at connect time do not create multiple
  81. // tokens. The implementation should also periodically refresh the token in
  82. // order to guarantee that each call returns an unexpired token. This
  83. // method should not block indefinitely--a timeout error should be returned
  84. // after a short period of inactivity so that the broker connection logic
  85. // can log debugging information and retry.
  86. Token() (*AccessToken, error)
  87. }
  88. // SCRAMClient is a an interface to a SCRAM
  89. // client implementation.
  90. type SCRAMClient interface {
  91. // Begin prepares the client for the SCRAM exchange
  92. // with the server with a user name and a password
  93. Begin(userName, password, authzID string) error
  94. // Step steps client through the SCRAM exchange. It is
  95. // called repeatedly until it errors or `Done` returns true.
  96. Step(challenge string) (response string, err error)
  97. // Done should return true when the SCRAM conversation
  98. // is over.
  99. Done() bool
  100. }
  101. type responsePromise struct {
  102. requestTime time.Time
  103. correlationID int32
  104. packets chan []byte
  105. errors chan error
  106. }
  107. // NewBroker creates and returns a Broker targeting the given host:port address.
  108. // This does not attempt to actually connect, you have to call Open() for that.
  109. func NewBroker(addr string) *Broker {
  110. return &Broker{id: -1, addr: addr}
  111. }
  112. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  113. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  114. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  115. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  116. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  117. func (b *Broker) Open(conf *Config) error {
  118. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  119. return ErrAlreadyConnected
  120. }
  121. if conf == nil {
  122. conf = NewConfig()
  123. }
  124. err := conf.Validate()
  125. if err != nil {
  126. return err
  127. }
  128. b.lock.Lock()
  129. go withRecover(func() {
  130. defer b.lock.Unlock()
  131. dialer := net.Dialer{
  132. Timeout: conf.Net.DialTimeout,
  133. KeepAlive: conf.Net.KeepAlive,
  134. LocalAddr: conf.Net.LocalAddr,
  135. }
  136. if conf.Net.TLS.Enable {
  137. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  138. } else {
  139. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  140. }
  141. if b.connErr != nil {
  142. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  143. b.conn = nil
  144. atomic.StoreInt32(&b.opened, 0)
  145. return
  146. }
  147. b.conn = newBufConn(b.conn)
  148. b.conf = conf
  149. // Create or reuse the global metrics shared between brokers
  150. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  151. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  152. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  153. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  154. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  155. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  156. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  157. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  158. // the same id (-1) and are already exposed through the global metrics above
  159. if b.id >= 0 {
  160. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  161. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  162. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  163. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  164. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  165. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  166. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  167. }
  168. if conf.Net.SASL.Enable {
  169. b.connErr = b.authenticateViaSASL()
  170. if b.connErr != nil {
  171. err = b.conn.Close()
  172. if err == nil {
  173. Logger.Printf("Closed connection to broker %s\n", b.addr)
  174. } else {
  175. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  176. }
  177. b.conn = nil
  178. atomic.StoreInt32(&b.opened, 0)
  179. return
  180. }
  181. }
  182. b.done = make(chan bool)
  183. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  184. if b.id >= 0 {
  185. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  186. } else {
  187. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  188. }
  189. go withRecover(b.responseReceiver)
  190. })
  191. return nil
  192. }
  193. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  194. // connected but it had tried to connect, the error from that connection attempt is also returned.
  195. func (b *Broker) Connected() (bool, error) {
  196. b.lock.Lock()
  197. defer b.lock.Unlock()
  198. return b.conn != nil, b.connErr
  199. }
  200. func (b *Broker) Close() error {
  201. b.lock.Lock()
  202. defer b.lock.Unlock()
  203. if b.conn == nil {
  204. return ErrNotConnected
  205. }
  206. close(b.responses)
  207. <-b.done
  208. err := b.conn.Close()
  209. b.conn = nil
  210. b.connErr = nil
  211. b.done = nil
  212. b.responses = nil
  213. if b.id >= 0 {
  214. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
  215. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
  216. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
  217. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
  218. }
  219. if err == nil {
  220. Logger.Printf("Closed connection to broker %s\n", b.addr)
  221. } else {
  222. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  223. }
  224. atomic.StoreInt32(&b.opened, 0)
  225. return err
  226. }
  227. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  228. func (b *Broker) ID() int32 {
  229. return b.id
  230. }
  231. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  232. func (b *Broker) Addr() string {
  233. return b.addr
  234. }
  235. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  236. // empty string if it is not known. The returned value corresponds to the
  237. // broker's broker.rack configuration setting. Requires protocol version to be
  238. // at least v0.10.0.0.
  239. func (b *Broker) Rack() string {
  240. if b.rack == nil {
  241. return ""
  242. }
  243. return *b.rack
  244. }
  245. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  246. response := new(MetadataResponse)
  247. err := b.sendAndReceive(request, response)
  248. if err != nil {
  249. return nil, err
  250. }
  251. return response, nil
  252. }
  253. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  254. response := new(ConsumerMetadataResponse)
  255. err := b.sendAndReceive(request, response)
  256. if err != nil {
  257. return nil, err
  258. }
  259. return response, nil
  260. }
  261. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  262. response := new(FindCoordinatorResponse)
  263. err := b.sendAndReceive(request, response)
  264. if err != nil {
  265. return nil, err
  266. }
  267. return response, nil
  268. }
  269. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  270. response := new(OffsetResponse)
  271. err := b.sendAndReceive(request, response)
  272. if err != nil {
  273. return nil, err
  274. }
  275. return response, nil
  276. }
  277. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  278. var response *ProduceResponse
  279. var err error
  280. if request.RequiredAcks == NoResponse {
  281. err = b.sendAndReceive(request, nil)
  282. } else {
  283. response = new(ProduceResponse)
  284. err = b.sendAndReceive(request, response)
  285. }
  286. if err != nil {
  287. return nil, err
  288. }
  289. return response, nil
  290. }
  291. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  292. response := new(FetchResponse)
  293. err := b.sendAndReceive(request, response)
  294. if err != nil {
  295. return nil, err
  296. }
  297. return response, nil
  298. }
  299. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  300. response := new(OffsetCommitResponse)
  301. err := b.sendAndReceive(request, response)
  302. if err != nil {
  303. return nil, err
  304. }
  305. return response, nil
  306. }
  307. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  308. response := new(OffsetFetchResponse)
  309. err := b.sendAndReceive(request, response)
  310. if err != nil {
  311. return nil, err
  312. }
  313. return response, nil
  314. }
  315. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  316. response := new(JoinGroupResponse)
  317. err := b.sendAndReceive(request, response)
  318. if err != nil {
  319. return nil, err
  320. }
  321. return response, nil
  322. }
  323. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  324. response := new(SyncGroupResponse)
  325. err := b.sendAndReceive(request, response)
  326. if err != nil {
  327. return nil, err
  328. }
  329. return response, nil
  330. }
  331. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  332. response := new(LeaveGroupResponse)
  333. err := b.sendAndReceive(request, response)
  334. if err != nil {
  335. return nil, err
  336. }
  337. return response, nil
  338. }
  339. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  340. response := new(HeartbeatResponse)
  341. err := b.sendAndReceive(request, response)
  342. if err != nil {
  343. return nil, err
  344. }
  345. return response, nil
  346. }
  347. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  348. response := new(ListGroupsResponse)
  349. err := b.sendAndReceive(request, response)
  350. if err != nil {
  351. return nil, err
  352. }
  353. return response, nil
  354. }
  355. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  356. response := new(DescribeGroupsResponse)
  357. err := b.sendAndReceive(request, response)
  358. if err != nil {
  359. return nil, err
  360. }
  361. return response, nil
  362. }
  363. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  364. response := new(ApiVersionsResponse)
  365. err := b.sendAndReceive(request, response)
  366. if err != nil {
  367. return nil, err
  368. }
  369. return response, nil
  370. }
  371. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  372. response := new(CreateTopicsResponse)
  373. err := b.sendAndReceive(request, response)
  374. if err != nil {
  375. return nil, err
  376. }
  377. return response, nil
  378. }
  379. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  380. response := new(DeleteTopicsResponse)
  381. err := b.sendAndReceive(request, response)
  382. if err != nil {
  383. return nil, err
  384. }
  385. return response, nil
  386. }
  387. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  388. response := new(CreatePartitionsResponse)
  389. err := b.sendAndReceive(request, response)
  390. if err != nil {
  391. return nil, err
  392. }
  393. return response, nil
  394. }
  395. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  396. response := new(DeleteRecordsResponse)
  397. err := b.sendAndReceive(request, response)
  398. if err != nil {
  399. return nil, err
  400. }
  401. return response, nil
  402. }
  403. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  404. response := new(DescribeAclsResponse)
  405. err := b.sendAndReceive(request, response)
  406. if err != nil {
  407. return nil, err
  408. }
  409. return response, nil
  410. }
  411. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  412. response := new(CreateAclsResponse)
  413. err := b.sendAndReceive(request, response)
  414. if err != nil {
  415. return nil, err
  416. }
  417. return response, nil
  418. }
  419. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  420. response := new(DeleteAclsResponse)
  421. err := b.sendAndReceive(request, response)
  422. if err != nil {
  423. return nil, err
  424. }
  425. return response, nil
  426. }
  427. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  428. response := new(InitProducerIDResponse)
  429. err := b.sendAndReceive(request, response)
  430. if err != nil {
  431. return nil, err
  432. }
  433. return response, nil
  434. }
  435. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  436. response := new(AddPartitionsToTxnResponse)
  437. err := b.sendAndReceive(request, response)
  438. if err != nil {
  439. return nil, err
  440. }
  441. return response, nil
  442. }
  443. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  444. response := new(AddOffsetsToTxnResponse)
  445. err := b.sendAndReceive(request, response)
  446. if err != nil {
  447. return nil, err
  448. }
  449. return response, nil
  450. }
  451. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  452. response := new(EndTxnResponse)
  453. err := b.sendAndReceive(request, response)
  454. if err != nil {
  455. return nil, err
  456. }
  457. return response, nil
  458. }
  459. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  460. response := new(TxnOffsetCommitResponse)
  461. err := b.sendAndReceive(request, response)
  462. if err != nil {
  463. return nil, err
  464. }
  465. return response, nil
  466. }
  467. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  468. response := new(DescribeConfigsResponse)
  469. err := b.sendAndReceive(request, response)
  470. if err != nil {
  471. return nil, err
  472. }
  473. return response, nil
  474. }
  475. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  476. response := new(AlterConfigsResponse)
  477. err := b.sendAndReceive(request, response)
  478. if err != nil {
  479. return nil, err
  480. }
  481. return response, nil
  482. }
  483. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  484. response := new(DeleteGroupsResponse)
  485. if err := b.sendAndReceive(request, response); err != nil {
  486. return nil, err
  487. }
  488. return response, nil
  489. }
  490. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  491. b.lock.Lock()
  492. defer b.lock.Unlock()
  493. if b.conn == nil {
  494. if b.connErr != nil {
  495. return nil, b.connErr
  496. }
  497. return nil, ErrNotConnected
  498. }
  499. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  500. return nil, ErrUnsupportedVersion
  501. }
  502. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  503. buf, err := encode(req, b.conf.MetricRegistry)
  504. if err != nil {
  505. return nil, err
  506. }
  507. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  508. if err != nil {
  509. return nil, err
  510. }
  511. requestTime := time.Now()
  512. bytes, err := b.conn.Write(buf)
  513. b.updateOutgoingCommunicationMetrics(bytes)
  514. if err != nil {
  515. return nil, err
  516. }
  517. b.correlationID++
  518. if !promiseResponse {
  519. // Record request latency without the response
  520. b.updateRequestLatencyMetrics(time.Since(requestTime))
  521. return nil, nil
  522. }
  523. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  524. b.responses <- promise
  525. return &promise, nil
  526. }
  527. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  528. promise, err := b.send(req, res != nil)
  529. if err != nil {
  530. return err
  531. }
  532. if promise == nil {
  533. return nil
  534. }
  535. select {
  536. case buf := <-promise.packets:
  537. return versionedDecode(buf, res, req.version())
  538. case err = <-promise.errors:
  539. return err
  540. }
  541. }
  542. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  543. b.id, err = pd.getInt32()
  544. if err != nil {
  545. return err
  546. }
  547. host, err := pd.getString()
  548. if err != nil {
  549. return err
  550. }
  551. port, err := pd.getInt32()
  552. if err != nil {
  553. return err
  554. }
  555. if version >= 1 {
  556. b.rack, err = pd.getNullableString()
  557. if err != nil {
  558. return err
  559. }
  560. }
  561. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  562. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  563. return err
  564. }
  565. return nil
  566. }
  567. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  568. host, portstr, err := net.SplitHostPort(b.addr)
  569. if err != nil {
  570. return err
  571. }
  572. port, err := strconv.Atoi(portstr)
  573. if err != nil {
  574. return err
  575. }
  576. pe.putInt32(b.id)
  577. err = pe.putString(host)
  578. if err != nil {
  579. return err
  580. }
  581. pe.putInt32(int32(port))
  582. if version >= 1 {
  583. err = pe.putNullableString(b.rack)
  584. if err != nil {
  585. return err
  586. }
  587. }
  588. return nil
  589. }
  590. func (b *Broker) responseReceiver() {
  591. var dead error
  592. header := make([]byte, 8)
  593. for response := range b.responses {
  594. if dead != nil {
  595. response.errors <- dead
  596. continue
  597. }
  598. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  599. if err != nil {
  600. dead = err
  601. response.errors <- err
  602. continue
  603. }
  604. bytesReadHeader, err := io.ReadFull(b.conn, header)
  605. requestLatency := time.Since(response.requestTime)
  606. if err != nil {
  607. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  608. dead = err
  609. response.errors <- err
  610. continue
  611. }
  612. decodedHeader := responseHeader{}
  613. err = decode(header, &decodedHeader)
  614. if err != nil {
  615. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  616. dead = err
  617. response.errors <- err
  618. continue
  619. }
  620. if decodedHeader.correlationID != response.correlationID {
  621. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  622. // TODO if decoded ID < cur ID, discard until we catch up
  623. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  624. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  625. response.errors <- dead
  626. continue
  627. }
  628. buf := make([]byte, decodedHeader.length-4)
  629. bytesReadBody, err := io.ReadFull(b.conn, buf)
  630. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  631. if err != nil {
  632. dead = err
  633. response.errors <- err
  634. continue
  635. }
  636. response.packets <- buf
  637. }
  638. close(b.done)
  639. }
  640. func (b *Broker) authenticateViaSASL() error {
  641. switch b.conf.Net.SASL.Mechanism {
  642. case SASLTypeOAuth:
  643. return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
  644. case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
  645. return b.sendAndReceiveSASLSCRAMv1()
  646. default:
  647. return b.sendAndReceiveSASLPlainAuth()
  648. }
  649. }
  650. func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
  651. rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
  652. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  653. buf, err := encode(req, b.conf.MetricRegistry)
  654. if err != nil {
  655. return err
  656. }
  657. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  658. if err != nil {
  659. return err
  660. }
  661. requestTime := time.Now()
  662. bytes, err := b.conn.Write(buf)
  663. b.updateOutgoingCommunicationMetrics(bytes)
  664. if err != nil {
  665. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  666. return err
  667. }
  668. b.correlationID++
  669. //wait for the response
  670. header := make([]byte, 8) // response header
  671. _, err = io.ReadFull(b.conn, header)
  672. if err != nil {
  673. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  674. return err
  675. }
  676. length := binary.BigEndian.Uint32(header[:4])
  677. payload := make([]byte, length-4)
  678. n, err := io.ReadFull(b.conn, payload)
  679. if err != nil {
  680. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  681. return err
  682. }
  683. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  684. res := &SaslHandshakeResponse{}
  685. err = versionedDecode(payload, res, 0)
  686. if err != nil {
  687. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  688. return err
  689. }
  690. if res.Err != ErrNoError {
  691. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  692. return res.Err
  693. }
  694. Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
  695. return nil
  696. }
  697. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  698. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  699. //
  700. // In SASL Plain, Kafka expects the auth header to be in the following format
  701. // Message format (from https://tools.ietf.org/html/rfc4616):
  702. //
  703. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  704. // authcid = 1*SAFE ; MUST accept up to 255 octets
  705. // authzid = 1*SAFE ; MUST accept up to 255 octets
  706. // passwd = 1*SAFE ; MUST accept up to 255 octets
  707. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  708. //
  709. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  710. // ;; any UTF-8 encoded Unicode character except NUL
  711. //
  712. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  713. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  714. // of responding to bad credentials but thats how its being done today.
  715. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  716. if b.conf.Net.SASL.Handshake {
  717. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, SASLHandshakeV0)
  718. if handshakeErr != nil {
  719. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  720. return handshakeErr
  721. }
  722. }
  723. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  724. authBytes := make([]byte, length+4) //4 byte length header + auth data
  725. binary.BigEndian.PutUint32(authBytes, uint32(length))
  726. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  727. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  728. if err != nil {
  729. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  730. return err
  731. }
  732. requestTime := time.Now()
  733. bytesWritten, err := b.conn.Write(authBytes)
  734. b.updateOutgoingCommunicationMetrics(bytesWritten)
  735. if err != nil {
  736. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  737. return err
  738. }
  739. header := make([]byte, 4)
  740. n, err := io.ReadFull(b.conn, header)
  741. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  742. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  743. // Otherwise, the broker closes the connection and we get an EOF
  744. if err != nil {
  745. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  746. return err
  747. }
  748. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  749. return nil
  750. }
  751. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  752. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  753. func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
  754. if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
  755. return err
  756. }
  757. token, err := provider.Token()
  758. if err != nil {
  759. return err
  760. }
  761. requestTime := time.Now()
  762. correlationID := b.correlationID
  763. bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
  764. if err != nil {
  765. return err
  766. }
  767. b.updateOutgoingCommunicationMetrics(bytesWritten)
  768. b.correlationID++
  769. bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
  770. if err != nil {
  771. return err
  772. }
  773. requestLatency := time.Since(requestTime)
  774. b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  775. return nil
  776. }
  777. func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
  778. if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
  779. return err
  780. }
  781. scramClient := b.conf.Net.SASL.SCRAMClient
  782. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  783. return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
  784. }
  785. msg, err := scramClient.Step("")
  786. if err != nil {
  787. return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
  788. }
  789. for !scramClient.Done() {
  790. requestTime := time.Now()
  791. correlationID := b.correlationID
  792. bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
  793. if err != nil {
  794. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  795. return err
  796. }
  797. b.updateOutgoingCommunicationMetrics(bytesWritten)
  798. b.correlationID++
  799. challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
  800. if err != nil {
  801. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  802. return err
  803. }
  804. b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
  805. msg, err = scramClient.Step(string(challenge))
  806. if err != nil {
  807. Logger.Println("SASL authentication failed", err)
  808. return err
  809. }
  810. }
  811. Logger.Println("SASL authentication succeeded")
  812. return nil
  813. }
  814. func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
  815. rb := &SaslAuthenticateRequest{msg}
  816. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  817. buf, err := encode(req, b.conf.MetricRegistry)
  818. if err != nil {
  819. return 0, err
  820. }
  821. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  822. return 0, err
  823. }
  824. return b.conn.Write(buf)
  825. }
  826. func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
  827. buf := make([]byte, responseLengthSize+correlationIDSize)
  828. _, err := io.ReadFull(b.conn, buf)
  829. if err != nil {
  830. return nil, err
  831. }
  832. header := responseHeader{}
  833. err = decode(buf, &header)
  834. if err != nil {
  835. return nil, err
  836. }
  837. if header.correlationID != correlationID {
  838. return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  839. }
  840. buf = make([]byte, header.length-correlationIDSize)
  841. _, err = io.ReadFull(b.conn, buf)
  842. if err != nil {
  843. return nil, err
  844. }
  845. res := &SaslAuthenticateResponse{}
  846. if err := versionedDecode(buf, res, 0); err != nil {
  847. return nil, err
  848. }
  849. if res.Err != ErrNoError {
  850. return nil, res.Err
  851. }
  852. return res.SaslAuthBytes, nil
  853. }
  854. // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
  855. // https://tools.ietf.org/html/rfc7628
  856. func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
  857. var ext string
  858. if token.Extensions != nil && len(token.Extensions) > 0 {
  859. if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
  860. return []byte{}, fmt.Errorf("The extension `%s` is invalid", SASLExtKeyAuth)
  861. }
  862. ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
  863. }
  864. resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
  865. return resp, nil
  866. }
  867. // mapToString returns a list of key-value pairs ordered by key.
  868. // keyValSep separates the key from the value. elemSep separates each pair.
  869. func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
  870. buf := make([]string, 0, len(extensions))
  871. for k, v := range extensions {
  872. buf = append(buf, k+keyValSep+v)
  873. }
  874. sort.Strings(buf)
  875. return strings.Join(buf, elemSep)
  876. }
  877. func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
  878. initialResp, err := buildClientInitialResponse(token)
  879. if err != nil {
  880. return 0, err
  881. }
  882. rb := &SaslAuthenticateRequest{initialResp}
  883. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  884. buf, err := encode(req, b.conf.MetricRegistry)
  885. if err != nil {
  886. return 0, err
  887. }
  888. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  889. return 0, err
  890. }
  891. return b.conn.Write(buf)
  892. }
  893. func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
  894. buf := make([]byte, 8)
  895. bytesRead, err := io.ReadFull(b.conn, buf)
  896. if err != nil {
  897. return bytesRead, err
  898. }
  899. header := responseHeader{}
  900. err = decode(buf, &header)
  901. if err != nil {
  902. return bytesRead, err
  903. }
  904. if header.correlationID != correlationID {
  905. return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  906. }
  907. buf = make([]byte, header.length-4)
  908. c, err := io.ReadFull(b.conn, buf)
  909. bytesRead += c
  910. if err != nil {
  911. return bytesRead, err
  912. }
  913. res := &SaslAuthenticateResponse{}
  914. if err := versionedDecode(buf, res, 0); err != nil {
  915. return bytesRead, err
  916. }
  917. if err != nil {
  918. return bytesRead, err
  919. }
  920. if res.Err != ErrNoError {
  921. return bytesRead, res.Err
  922. }
  923. if len(res.SaslAuthBytes) > 0 {
  924. Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
  925. }
  926. return bytesRead, nil
  927. }
  928. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  929. b.updateRequestLatencyMetrics(requestLatency)
  930. b.responseRate.Mark(1)
  931. if b.brokerResponseRate != nil {
  932. b.brokerResponseRate.Mark(1)
  933. }
  934. responseSize := int64(bytes)
  935. b.incomingByteRate.Mark(responseSize)
  936. if b.brokerIncomingByteRate != nil {
  937. b.brokerIncomingByteRate.Mark(responseSize)
  938. }
  939. b.responseSize.Update(responseSize)
  940. if b.brokerResponseSize != nil {
  941. b.brokerResponseSize.Update(responseSize)
  942. }
  943. }
  944. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  945. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  946. b.requestLatency.Update(requestLatencyInMs)
  947. if b.brokerRequestLatency != nil {
  948. b.brokerRequestLatency.Update(requestLatencyInMs)
  949. }
  950. }
  951. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  952. b.requestRate.Mark(1)
  953. if b.brokerRequestRate != nil {
  954. b.brokerRequestRate.Mark(1)
  955. }
  956. requestSize := int64(bytes)
  957. b.outgoingByteRate.Mark(requestSize)
  958. if b.brokerOutgoingByteRate != nil {
  959. b.brokerOutgoingByteRate.Mark(requestSize)
  960. }
  961. b.requestSize.Update(requestSize)
  962. if b.brokerRequestSize != nil {
  963. b.brokerRequestSize.Update(requestSize)
  964. }
  965. }