broker.go 34 KB


  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/rcrowley/go-metrics"
  15. )
  16. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  17. type Broker struct {
  18. conf *Config
  19. rack *string
  20. id int32
  21. addr string
  22. correlationID int32
  23. conn net.Conn
  24. connErr error
  25. lock sync.Mutex
  26. opened int32
  27. responses chan responsePromise
  28. done chan bool
  29. registeredMetrics []string
  30. incomingByteRate metrics.Meter
  31. requestRate metrics.Meter
  32. requestSize metrics.Histogram
  33. requestLatency metrics.Histogram
  34. outgoingByteRate metrics.Meter
  35. responseRate metrics.Meter
  36. responseSize metrics.Histogram
  37. brokerIncomingByteRate metrics.Meter
  38. brokerRequestRate metrics.Meter
  39. brokerRequestSize metrics.Histogram
  40. brokerRequestLatency metrics.Histogram
  41. brokerOutgoingByteRate metrics.Meter
  42. brokerResponseRate metrics.Meter
  43. brokerResponseSize metrics.Histogram
  44. }
  45. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  46. type SASLMechanism string
  47. const (
  48. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  49. SASLTypeOAuth = "OAUTHBEARER"
  50. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  51. SASLTypePlaintext = "PLAIN"
  52. // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
  53. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
  54. // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
  55. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
  56. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  57. // server negotiate SASL auth using opaque packets.
  58. SASLHandshakeV0 = int16(0)
  59. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  60. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  61. SASLHandshakeV1 = int16(1)
  62. // SASLExtKeyAuth is the reserved extension key name sent as part of the
  63. // SASL/OAUTHBEARER intial client response
  64. SASLExtKeyAuth = "auth"
  65. )
  66. // AccessToken contains an access token used to authenticate a
  67. // SASL/OAUTHBEARER client along with associated metadata.
  68. type AccessToken struct {
  69. // Token is the access token payload.
  70. Token string
  71. // Extensions is a optional map of arbitrary key-value pairs that can be
  72. // sent with the SASL/OAUTHBEARER initial client response. These values are
  73. // ignored by the SASL server if they are unexpected. This feature is only
  74. // supported by Kafka >= 2.1.0.
  75. Extensions map[string]string
  76. }
  77. // AccessTokenProvider is the interface that encapsulates how implementors
  78. // can generate access tokens for Kafka broker authentication.
  79. type AccessTokenProvider interface {
  80. // Token returns an access token. The implementation should ensure token
  81. // reuse so that multiple calls at connect time do not create multiple
  82. // tokens. The implementation should also periodically refresh the token in
  83. // order to guarantee that each call returns an unexpired token. This
  84. // method should not block indefinitely--a timeout error should be returned
  85. // after a short period of inactivity so that the broker connection logic
  86. // can log debugging information and retry.
  87. Token() (*AccessToken, error)
  88. }
  89. // SCRAMClient is a an interface to a SCRAM
  90. // client implementation.
  91. type SCRAMClient interface {
  92. // Begin prepares the client for the SCRAM exchange
  93. // with the server with a user name and a password
  94. Begin(userName, password, authzID string) error
  95. // Step steps client through the SCRAM exchange. It is
  96. // called repeatedly until it errors or `Done` returns true.
  97. Step(challenge string) (response string, err error)
  98. // Done should return true when the SCRAM conversation
  99. // is over.
  100. Done() bool
  101. }
  102. type responsePromise struct {
  103. requestTime time.Time
  104. correlationID int32
  105. packets chan []byte
  106. errors chan error
  107. }
  108. // NewBroker creates and returns a Broker targeting the given host:port address.
  109. // This does not attempt to actually connect, you have to call Open() for that.
  110. func NewBroker(addr string) *Broker {
  111. return &Broker{id: -1, addr: addr}
  112. }
  113. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  114. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  115. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  116. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  117. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  118. func (b *Broker) Open(conf *Config) error {
  119. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  120. return ErrAlreadyConnected
  121. }
  122. if conf == nil {
  123. conf = NewConfig()
  124. }
  125. err := conf.Validate()
  126. if err != nil {
  127. return err
  128. }
  129. b.lock.Lock()
  130. go withRecover(func() {
  131. defer b.lock.Unlock()
  132. dialer := net.Dialer{
  133. Timeout: conf.Net.DialTimeout,
  134. KeepAlive: conf.Net.KeepAlive,
  135. LocalAddr: conf.Net.LocalAddr,
  136. }
  137. if conf.Net.TLS.Enable {
  138. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  139. } else {
  140. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  141. }
  142. if b.connErr != nil {
  143. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  144. b.conn = nil
  145. atomic.StoreInt32(&b.opened, 0)
  146. return
  147. }
  148. b.conn = newBufConn(b.conn)
  149. b.conf = conf
  150. // Create or reuse the global metrics shared between brokers
  151. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  152. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  153. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  154. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  155. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  156. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  157. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  158. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  159. // the same id (-1) and are already exposed through the global metrics above
  160. if b.id >= 0 {
  161. b.registerMetrics()
  162. }
  163. if conf.Net.SASL.Enable {
  164. b.connErr = b.authenticateViaSASL()
  165. if b.connErr != nil {
  166. err = b.conn.Close()
  167. if err == nil {
  168. Logger.Printf("Closed connection to broker %s\n", b.addr)
  169. } else {
  170. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  171. }
  172. b.conn = nil
  173. atomic.StoreInt32(&b.opened, 0)
  174. return
  175. }
  176. }
  177. b.done = make(chan bool)
  178. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  179. if b.id >= 0 {
  180. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  181. } else {
  182. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  183. }
  184. go withRecover(b.responseReceiver)
  185. })
  186. return nil
  187. }
  188. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  189. // connected but it had tried to connect, the error from that connection attempt is also returned.
  190. func (b *Broker) Connected() (bool, error) {
  191. b.lock.Lock()
  192. defer b.lock.Unlock()
  193. return b.conn != nil, b.connErr
  194. }
  195. //Close closes the broker resources
  196. func (b *Broker) Close() error {
  197. b.lock.Lock()
  198. defer b.lock.Unlock()
  199. if b.conn == nil {
  200. return ErrNotConnected
  201. }
  202. close(b.responses)
  203. <-b.done
  204. err := b.conn.Close()
  205. b.conn = nil
  206. b.connErr = nil
  207. b.done = nil
  208. b.responses = nil
  209. b.unregisterMetrics()
  210. if err == nil {
  211. Logger.Printf("Closed connection to broker %s\n", b.addr)
  212. } else {
  213. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  214. }
  215. atomic.StoreInt32(&b.opened, 0)
  216. return err
  217. }
  218. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  219. func (b *Broker) ID() int32 {
  220. return b.id
  221. }
  222. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  223. func (b *Broker) Addr() string {
  224. return b.addr
  225. }
  226. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  227. // empty string if it is not known. The returned value corresponds to the
  228. // broker's broker.rack configuration setting. Requires protocol version to be
  229. // at least v0.10.0.0.
  230. func (b *Broker) Rack() string {
  231. if b.rack == nil {
  232. return ""
  233. }
  234. return *b.rack
  235. }
  236. //GetMetadata send a metadata request and returns a metadata response or error
  237. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  238. response := new(MetadataResponse)
  239. err := b.sendAndReceive(request, response)
  240. if err != nil {
  241. return nil, err
  242. }
  243. return response, nil
  244. }
  245. //GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
  246. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  247. response := new(ConsumerMetadataResponse)
  248. err := b.sendAndReceive(request, response)
  249. if err != nil {
  250. return nil, err
  251. }
  252. return response, nil
  253. }
  254. //FindCoordinator sends a find coordinate request and returns a response or error
  255. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  256. response := new(FindCoordinatorResponse)
  257. err := b.sendAndReceive(request, response)
  258. if err != nil {
  259. return nil, err
  260. }
  261. return response, nil
  262. }
  263. //GetAvailableOffsets return an offset response or error
  264. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  265. response := new(OffsetResponse)
  266. err := b.sendAndReceive(request, response)
  267. if err != nil {
  268. return nil, err
  269. }
  270. return response, nil
  271. }
  272. //Produce returns a produce response or error
  273. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  274. var (
  275. response *ProduceResponse
  276. err error
  277. )
  278. if request.RequiredAcks == NoResponse {
  279. err = b.sendAndReceive(request, nil)
  280. } else {
  281. response = new(ProduceResponse)
  282. err = b.sendAndReceive(request, response)
  283. }
  284. if err != nil {
  285. return nil, err
  286. }
  287. return response, nil
  288. }
  289. //Fetch returns a FetchResponse or error
  290. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  291. response := new(FetchResponse)
  292. err := b.sendAndReceive(request, response)
  293. if err != nil {
  294. return nil, err
  295. }
  296. return response, nil
  297. }
  298. //CommitOffset return an Offset commit reponse or error
  299. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  300. response := new(OffsetCommitResponse)
  301. err := b.sendAndReceive(request, response)
  302. if err != nil {
  303. return nil, err
  304. }
  305. return response, nil
  306. }
  307. //FetchOffset returns an offset fetch response or error
  308. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  309. response := new(OffsetFetchResponse)
  310. err := b.sendAndReceive(request, response)
  311. if err != nil {
  312. return nil, err
  313. }
  314. return response, nil
  315. }
  316. //JoinGroup returns a join group response or error
  317. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  318. response := new(JoinGroupResponse)
  319. err := b.sendAndReceive(request, response)
  320. if err != nil {
  321. return nil, err
  322. }
  323. return response, nil
  324. }
  325. //SyncGroup returns a sync group response or error
  326. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  327. response := new(SyncGroupResponse)
  328. err := b.sendAndReceive(request, response)
  329. if err != nil {
  330. return nil, err
  331. }
  332. return response, nil
  333. }
  334. //LeaveGroup return a leave group response or error
  335. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  336. response := new(LeaveGroupResponse)
  337. err := b.sendAndReceive(request, response)
  338. if err != nil {
  339. return nil, err
  340. }
  341. return response, nil
  342. }
  343. //Heartbeat returns a heartbeat response or error
  344. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  345. response := new(HeartbeatResponse)
  346. err := b.sendAndReceive(request, response)
  347. if err != nil {
  348. return nil, err
  349. }
  350. return response, nil
  351. }
  352. //ListGroups return a list group response or error
  353. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  354. response := new(ListGroupsResponse)
  355. err := b.sendAndReceive(request, response)
  356. if err != nil {
  357. return nil, err
  358. }
  359. return response, nil
  360. }
  361. //DescribeGroups return describe group response or error
  362. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  363. response := new(DescribeGroupsResponse)
  364. err := b.sendAndReceive(request, response)
  365. if err != nil {
  366. return nil, err
  367. }
  368. return response, nil
  369. }
  370. //ApiVersions return api version response or error
  371. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  372. response := new(ApiVersionsResponse)
  373. err := b.sendAndReceive(request, response)
  374. if err != nil {
  375. return nil, err
  376. }
  377. return response, nil
  378. }
  379. //CreateTopics send a create topic request and returns create topic response
  380. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  381. response := new(CreateTopicsResponse)
  382. err := b.sendAndReceive(request, response)
  383. if err != nil {
  384. return nil, err
  385. }
  386. return response, nil
  387. }
  388. //DeleteTopics sends a delete topic request and returns delete topic response
  389. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  390. response := new(DeleteTopicsResponse)
  391. err := b.sendAndReceive(request, response)
  392. if err != nil {
  393. return nil, err
  394. }
  395. return response, nil
  396. }
  397. //CreatePartitions sends a create partition request and returns create
  398. //partitions response or error
  399. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  400. response := new(CreatePartitionsResponse)
  401. err := b.sendAndReceive(request, response)
  402. if err != nil {
  403. return nil, err
  404. }
  405. return response, nil
  406. }
  407. //DeleteRecords send a request to delete records and return delete record
  408. //response or error
  409. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  410. response := new(DeleteRecordsResponse)
  411. err := b.sendAndReceive(request, response)
  412. if err != nil {
  413. return nil, err
  414. }
  415. return response, nil
  416. }
  417. //DescribeAcls sends a describe acl request and returns a response or error
  418. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  419. response := new(DescribeAclsResponse)
  420. err := b.sendAndReceive(request, response)
  421. if err != nil {
  422. return nil, err
  423. }
  424. return response, nil
  425. }
  426. //CreateAcls sends a create acl request and returns a response or error
  427. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  428. response := new(CreateAclsResponse)
  429. err := b.sendAndReceive(request, response)
  430. if err != nil {
  431. return nil, err
  432. }
  433. return response, nil
  434. }
  435. //DeleteAcls sends a delete acl request and returns a response or error
  436. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  437. response := new(DeleteAclsResponse)
  438. err := b.sendAndReceive(request, response)
  439. if err != nil {
  440. return nil, err
  441. }
  442. return response, nil
  443. }
  444. //InitProducerID sends an init producer request and returns a response or error
  445. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  446. response := new(InitProducerIDResponse)
  447. err := b.sendAndReceive(request, response)
  448. if err != nil {
  449. return nil, err
  450. }
  451. return response, nil
  452. }
  453. //AddPartitionsToTxn send a request to add partition to txn and returns
  454. //a response or error
  455. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  456. response := new(AddPartitionsToTxnResponse)
  457. err := b.sendAndReceive(request, response)
  458. if err != nil {
  459. return nil, err
  460. }
  461. return response, nil
  462. }
  463. //AddOffsetsToTxn sends a request to add offsets to txn and returns a response
  464. //or error
  465. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  466. response := new(AddOffsetsToTxnResponse)
  467. err := b.sendAndReceive(request, response)
  468. if err != nil {
  469. return nil, err
  470. }
  471. return response, nil
  472. }
  473. //EndTxn sends a request to end txn and returns a response or error
  474. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  475. response := new(EndTxnResponse)
  476. err := b.sendAndReceive(request, response)
  477. if err != nil {
  478. return nil, err
  479. }
  480. return response, nil
  481. }
  482. //TxnOffsetCommit sends a request to commit transaction offsets and returns
  483. //a response or error
  484. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  485. response := new(TxnOffsetCommitResponse)
  486. err := b.sendAndReceive(request, response)
  487. if err != nil {
  488. return nil, err
  489. }
  490. return response, nil
  491. }
  492. //DescribeConfigs sends a request to describe config and returns a response or
  493. //error
  494. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  495. response := new(DescribeConfigsResponse)
  496. err := b.sendAndReceive(request, response)
  497. if err != nil {
  498. return nil, err
  499. }
  500. return response, nil
  501. }
  502. //AlterConfigs sends a request to alter config and return a response or error
  503. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  504. response := new(AlterConfigsResponse)
  505. err := b.sendAndReceive(request, response)
  506. if err != nil {
  507. return nil, err
  508. }
  509. return response, nil
  510. }
  511. //DeleteGroups sends a request to delete groups and returns a response or error
  512. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  513. response := new(DeleteGroupsResponse)
  514. if err := b.sendAndReceive(request, response); err != nil {
  515. return nil, err
  516. }
  517. return response, nil
  518. }
  519. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  520. b.lock.Lock()
  521. defer b.lock.Unlock()
  522. if b.conn == nil {
  523. if b.connErr != nil {
  524. return nil, b.connErr
  525. }
  526. return nil, ErrNotConnected
  527. }
  528. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  529. return nil, ErrUnsupportedVersion
  530. }
  531. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  532. buf, err := encode(req, b.conf.MetricRegistry)
  533. if err != nil {
  534. return nil, err
  535. }
  536. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  537. if err != nil {
  538. return nil, err
  539. }
  540. requestTime := time.Now()
  541. bytes, err := b.conn.Write(buf)
  542. b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
  543. if err != nil {
  544. return nil, err
  545. }
  546. b.correlationID++
  547. if !promiseResponse {
  548. // Record request latency without the response
  549. b.updateRequestLatencyMetrics(time.Since(requestTime))
  550. return nil, nil
  551. }
  552. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  553. b.responses <- promise
  554. return &promise, nil
  555. }
  556. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  557. promise, err := b.send(req, res != nil)
  558. if err != nil {
  559. return err
  560. }
  561. if promise == nil {
  562. return nil
  563. }
  564. select {
  565. case buf := <-promise.packets:
  566. return versionedDecode(buf, res, req.version())
  567. case err = <-promise.errors:
  568. return err
  569. }
  570. }
  571. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  572. b.id, err = pd.getInt32()
  573. if err != nil {
  574. return err
  575. }
  576. host, err := pd.getString()
  577. if err != nil {
  578. return err
  579. }
  580. port, err := pd.getInt32()
  581. if err != nil {
  582. return err
  583. }
  584. if version >= 1 {
  585. b.rack, err = pd.getNullableString()
  586. if err != nil {
  587. return err
  588. }
  589. }
  590. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  591. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  592. return err
  593. }
  594. return nil
  595. }
  596. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  597. host, portstr, err := net.SplitHostPort(b.addr)
  598. if err != nil {
  599. return err
  600. }
  601. port, err := strconv.Atoi(portstr)
  602. if err != nil {
  603. return err
  604. }
  605. pe.putInt32(b.id)
  606. err = pe.putString(host)
  607. if err != nil {
  608. return err
  609. }
  610. pe.putInt32(int32(port))
  611. if version >= 1 {
  612. err = pe.putNullableString(b.rack)
  613. if err != nil {
  614. return err
  615. }
  616. }
  617. return nil
  618. }
  619. func (b *Broker) responseReceiver() {
  620. var dead error
  621. header := make([]byte, 8)
  622. for response := range b.responses {
  623. if dead != nil {
  624. response.errors <- dead
  625. continue
  626. }
  627. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  628. if err != nil {
  629. dead = err
  630. response.errors <- err
  631. continue
  632. }
  633. bytesReadHeader, err := io.ReadFull(b.conn, header)
  634. requestLatency := time.Since(response.requestTime)
  635. if err != nil {
  636. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  637. dead = err
  638. response.errors <- err
  639. continue
  640. }
  641. decodedHeader := responseHeader{}
  642. err = decode(header, &decodedHeader)
  643. if err != nil {
  644. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  645. dead = err
  646. response.errors <- err
  647. continue
  648. }
  649. if decodedHeader.correlationID != response.correlationID {
  650. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  651. // TODO if decoded ID < cur ID, discard until we catch up
  652. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  653. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  654. response.errors <- dead
  655. continue
  656. }
  657. buf := make([]byte, decodedHeader.length-4)
  658. bytesReadBody, err := io.ReadFull(b.conn, buf)
  659. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  660. if err != nil {
  661. dead = err
  662. response.errors <- err
  663. continue
  664. }
  665. response.packets <- buf
  666. }
  667. close(b.done)
  668. }
  669. func (b *Broker) authenticateViaSASL() error {
  670. switch b.conf.Net.SASL.Mechanism {
  671. case SASLTypeOAuth:
  672. return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
  673. case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
  674. return b.sendAndReceiveSASLSCRAMv1()
  675. default:
  676. return b.sendAndReceiveSASLPlainAuth()
  677. }
  678. }
  679. func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
  680. rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
  681. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  682. buf, err := encode(req, b.conf.MetricRegistry)
  683. if err != nil {
  684. return err
  685. }
  686. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  687. if err != nil {
  688. return err
  689. }
  690. requestTime := time.Now()
  691. bytes, err := b.conn.Write(buf)
  692. b.updateOutgoingCommunicationMetrics(bytes)
  693. if err != nil {
  694. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  695. return err
  696. }
  697. b.correlationID++
  698. //wait for the response
  699. header := make([]byte, 8) // response header
  700. _, err = io.ReadFull(b.conn, header)
  701. if err != nil {
  702. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  703. return err
  704. }
  705. length := binary.BigEndian.Uint32(header[:4])
  706. payload := make([]byte, length-4)
  707. n, err := io.ReadFull(b.conn, payload)
  708. if err != nil {
  709. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  710. return err
  711. }
  712. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  713. res := &SaslHandshakeResponse{}
  714. err = versionedDecode(payload, res, 0)
  715. if err != nil {
  716. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  717. return err
  718. }
  719. if res.Err != ErrNoError {
  720. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  721. return res.Err
  722. }
  723. Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
  724. return nil
  725. }
  726. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  727. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  728. //
  729. // In SASL Plain, Kafka expects the auth header to be in the following format
  730. // Message format (from https://tools.ietf.org/html/rfc4616):
  731. //
  732. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  733. // authcid = 1*SAFE ; MUST accept up to 255 octets
  734. // authzid = 1*SAFE ; MUST accept up to 255 octets
  735. // passwd = 1*SAFE ; MUST accept up to 255 octets
  736. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  737. //
  738. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  739. // ;; any UTF-8 encoded Unicode character except NUL
  740. //
  741. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  742. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  743. // of responding to bad credentials but thats how its being done today.
  744. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  745. if b.conf.Net.SASL.Handshake {
  746. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, SASLHandshakeV0)
  747. if handshakeErr != nil {
  748. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  749. return handshakeErr
  750. }
  751. }
  752. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  753. authBytes := make([]byte, length+4) //4 byte length header + auth data
  754. binary.BigEndian.PutUint32(authBytes, uint32(length))
  755. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  756. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  757. if err != nil {
  758. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  759. return err
  760. }
  761. requestTime := time.Now()
  762. bytesWritten, err := b.conn.Write(authBytes)
  763. b.updateOutgoingCommunicationMetrics(bytesWritten)
  764. if err != nil {
  765. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  766. return err
  767. }
  768. header := make([]byte, 4)
  769. n, err := io.ReadFull(b.conn, header)
  770. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  771. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  772. // Otherwise, the broker closes the connection and we get an EOF
  773. if err != nil {
  774. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  775. return err
  776. }
  777. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  778. return nil
  779. }
  780. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  781. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  782. func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
  783. if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
  784. return err
  785. }
  786. token, err := provider.Token()
  787. if err != nil {
  788. return err
  789. }
  790. requestTime := time.Now()
  791. correlationID := b.correlationID
  792. bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
  793. if err != nil {
  794. return err
  795. }
  796. b.updateOutgoingCommunicationMetrics(bytesWritten)
  797. b.correlationID++
  798. bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
  799. if err != nil {
  800. return err
  801. }
  802. requestLatency := time.Since(requestTime)
  803. b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  804. return nil
  805. }
  806. func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
  807. if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
  808. return err
  809. }
  810. scramClient := b.conf.Net.SASL.SCRAMClient
  811. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  812. return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
  813. }
  814. msg, err := scramClient.Step("")
  815. if err != nil {
  816. return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
  817. }
  818. for !scramClient.Done() {
  819. requestTime := time.Now()
  820. correlationID := b.correlationID
  821. bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
  822. if err != nil {
  823. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  824. return err
  825. }
  826. b.updateOutgoingCommunicationMetrics(bytesWritten)
  827. b.correlationID++
  828. challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
  829. if err != nil {
  830. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  831. return err
  832. }
  833. b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
  834. msg, err = scramClient.Step(string(challenge))
  835. if err != nil {
  836. Logger.Println("SASL authentication failed", err)
  837. return err
  838. }
  839. }
  840. Logger.Println("SASL authentication succeeded")
  841. return nil
  842. }
  843. func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
  844. rb := &SaslAuthenticateRequest{msg}
  845. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  846. buf, err := encode(req, b.conf.MetricRegistry)
  847. if err != nil {
  848. return 0, err
  849. }
  850. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  851. return 0, err
  852. }
  853. return b.conn.Write(buf)
  854. }
  855. func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
  856. buf := make([]byte, responseLengthSize+correlationIDSize)
  857. _, err := io.ReadFull(b.conn, buf)
  858. if err != nil {
  859. return nil, err
  860. }
  861. header := responseHeader{}
  862. err = decode(buf, &header)
  863. if err != nil {
  864. return nil, err
  865. }
  866. if header.correlationID != correlationID {
  867. return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  868. }
  869. buf = make([]byte, header.length-correlationIDSize)
  870. _, err = io.ReadFull(b.conn, buf)
  871. if err != nil {
  872. return nil, err
  873. }
  874. res := &SaslAuthenticateResponse{}
  875. if err := versionedDecode(buf, res, 0); err != nil {
  876. return nil, err
  877. }
  878. if res.Err != ErrNoError {
  879. return nil, res.Err
  880. }
  881. return res.SaslAuthBytes, nil
  882. }
  883. // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
  884. // https://tools.ietf.org/html/rfc7628
  885. func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
  886. var ext string
  887. if token.Extensions != nil && len(token.Extensions) > 0 {
  888. if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
  889. return []byte{}, fmt.Errorf("The extension `%s` is invalid", SASLExtKeyAuth)
  890. }
  891. ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
  892. }
  893. resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
  894. return resp, nil
  895. }
  896. // mapToString returns a list of key-value pairs ordered by key.
  897. // keyValSep separates the key from the value. elemSep separates each pair.
  898. func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
  899. buf := make([]string, 0, len(extensions))
  900. for k, v := range extensions {
  901. buf = append(buf, k+keyValSep+v)
  902. }
  903. sort.Strings(buf)
  904. return strings.Join(buf, elemSep)
  905. }
  906. func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
  907. initialResp, err := buildClientInitialResponse(token)
  908. if err != nil {
  909. return 0, err
  910. }
  911. rb := &SaslAuthenticateRequest{initialResp}
  912. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  913. buf, err := encode(req, b.conf.MetricRegistry)
  914. if err != nil {
  915. return 0, err
  916. }
  917. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  918. return 0, err
  919. }
  920. return b.conn.Write(buf)
  921. }
  922. func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
  923. buf := make([]byte, responseLengthSize+correlationIDSize)
  924. bytesRead, err := io.ReadFull(b.conn, buf)
  925. if err != nil {
  926. return bytesRead, err
  927. }
  928. header := responseHeader{}
  929. err = decode(buf, &header)
  930. if err != nil {
  931. return bytesRead, err
  932. }
  933. if header.correlationID != correlationID {
  934. return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  935. }
  936. buf = make([]byte, header.length-correlationIDSize)
  937. c, err := io.ReadFull(b.conn, buf)
  938. bytesRead += c
  939. if err != nil {
  940. return bytesRead, err
  941. }
  942. res := &SaslAuthenticateResponse{}
  943. if err := versionedDecode(buf, res, 0); err != nil {
  944. return bytesRead, err
  945. }
  946. if res.Err != ErrNoError {
  947. return bytesRead, res.Err
  948. }
  949. if len(res.SaslAuthBytes) > 0 {
  950. Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
  951. }
  952. return bytesRead, nil
  953. }
  954. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  955. b.updateRequestLatencyMetrics(requestLatency)
  956. b.responseRate.Mark(1)
  957. if b.brokerResponseRate != nil {
  958. b.brokerResponseRate.Mark(1)
  959. }
  960. responseSize := int64(bytes)
  961. b.incomingByteRate.Mark(responseSize)
  962. if b.brokerIncomingByteRate != nil {
  963. b.brokerIncomingByteRate.Mark(responseSize)
  964. }
  965. b.responseSize.Update(responseSize)
  966. if b.brokerResponseSize != nil {
  967. b.brokerResponseSize.Update(responseSize)
  968. }
  969. }
  970. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  971. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  972. b.requestLatency.Update(requestLatencyInMs)
  973. if b.brokerRequestLatency != nil {
  974. b.brokerRequestLatency.Update(requestLatencyInMs)
  975. }
  976. }
  977. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  978. b.requestRate.Mark(1)
  979. if b.brokerRequestRate != nil {
  980. b.brokerRequestRate.Mark(1)
  981. }
  982. requestSize := int64(bytes)
  983. b.outgoingByteRate.Mark(requestSize)
  984. if b.brokerOutgoingByteRate != nil {
  985. b.brokerOutgoingByteRate.Mark(requestSize)
  986. }
  987. b.requestSize.Update(requestSize)
  988. if b.brokerRequestSize != nil {
  989. b.brokerRequestSize.Update(requestSize)
  990. }
  991. }
  992. func (b *Broker) registerMetrics() {
  993. b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
  994. b.brokerRequestRate = b.registerMeter("request-rate")
  995. b.brokerRequestSize = b.registerHistogram("request-size")
  996. b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
  997. b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
  998. b.brokerResponseRate = b.registerMeter("response-rate")
  999. b.brokerResponseSize = b.registerHistogram("response-size")
  1000. }
  1001. func (b *Broker) unregisterMetrics() {
  1002. for _, name := range b.registeredMetrics {
  1003. b.conf.MetricRegistry.Unregister(name)
  1004. }
  1005. }
  1006. func (b *Broker) registerMeter(name string) metrics.Meter {
  1007. nameForBroker := getMetricNameForBroker(name, b)
  1008. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1009. return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
  1010. }
  1011. func (b *Broker) registerHistogram(name string) metrics.Histogram {
  1012. nameForBroker := getMetricNameForBroker(name, b)
  1013. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1014. return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
  1015. }