broker.go 27 KB


  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/pkg/errors"
  13. "github.com/rcrowley/go-metrics"
  14. )
  15. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  16. type Broker struct {
  17. id int32
  18. addr string
  19. rack *string
  20. conf *Config
  21. correlationID int32
  22. conn net.Conn
  23. connErr error
  24. lock sync.Mutex
  25. opened int32
  26. responses chan responsePromise
  27. done chan bool
  28. incomingByteRate metrics.Meter
  29. requestRate metrics.Meter
  30. requestSize metrics.Histogram
  31. requestLatency metrics.Histogram
  32. outgoingByteRate metrics.Meter
  33. responseRate metrics.Meter
  34. responseSize metrics.Histogram
  35. brokerIncomingByteRate metrics.Meter
  36. brokerRequestRate metrics.Meter
  37. brokerRequestSize metrics.Histogram
  38. brokerRequestLatency metrics.Histogram
  39. brokerOutgoingByteRate metrics.Meter
  40. brokerResponseRate metrics.Meter
  41. brokerResponseSize metrics.Histogram
  42. }
  43. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  44. type SASLMechanism string
  45. const (
  46. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  47. SASLTypeOAuth = "OAUTHBEARER"
  48. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  49. SASLTypePlaintext = "PLAIN"
  50. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  51. // server negotiate SASL auth using opaque packets.
  52. SASLHandshakeV0 = int16(0)
  53. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  54. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  55. SASLHandshakeV1 = int16(1)
  56. )
  57. // AccessTokenProvider is the interface that encapsulates how implementors
  58. // can generate access tokens for Kafka broker authentication.
  59. type AccessTokenProvider interface {
  60. // Token returns an access token. Because this method may be called multiple
  61. // times, each invocation returns a new, unexpired token. This method should
  62. // not block indefinitely. A timeout error should be returned after a short
  63. // period of inactivity so that the broker connection logic can log
  64. // debugging information and retry.
  65. Token() (string, error)
  66. }
  67. type responsePromise struct {
  68. requestTime time.Time
  69. correlationID int32
  70. packets chan []byte
  71. errors chan error
  72. }
  73. // NewBroker creates and returns a Broker targeting the given host:port address.
  74. // This does not attempt to actually connect, you have to call Open() for that.
  75. func NewBroker(addr string) *Broker {
  76. return &Broker{id: -1, addr: addr}
  77. }
  78. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  79. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  80. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  81. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  82. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  83. func (b *Broker) Open(conf *Config) error {
  84. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  85. return ErrAlreadyConnected
  86. }
  87. if conf == nil {
  88. conf = NewConfig()
  89. }
  90. err := conf.Validate()
  91. if err != nil {
  92. return err
  93. }
  94. b.lock.Lock()
  95. go withRecover(func() {
  96. defer b.lock.Unlock()
  97. dialer := net.Dialer{
  98. Timeout: conf.Net.DialTimeout,
  99. KeepAlive: conf.Net.KeepAlive,
  100. LocalAddr: conf.Net.LocalAddr,
  101. }
  102. if conf.Net.TLS.Enable {
  103. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  104. } else {
  105. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  106. }
  107. if b.connErr != nil {
  108. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  109. b.conn = nil
  110. atomic.StoreInt32(&b.opened, 0)
  111. return
  112. }
  113. b.conn = newBufConn(b.conn)
  114. b.conf = conf
  115. // Create or reuse the global metrics shared between brokers
  116. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  117. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  118. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  119. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  120. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  121. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  122. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  123. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  124. // the same id (-1) and are already exposed through the global metrics above
  125. if b.id >= 0 {
  126. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  127. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  128. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  129. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  130. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  131. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  132. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  133. }
  134. if conf.Net.SASL.Enable {
  135. switch conf.Net.SASL.Mechanism {
  136. case SASLTypeOAuth:
  137. b.connErr = b.sendAndReceiveSASLOAuth(conf.Net.SASL.TokenProvider)
  138. case SASLTypePlaintext:
  139. b.connErr = b.sendAndReceiveSASLPlainAuth()
  140. default:
  141. b.connErr = b.sendAndReceiveSASLPlainAuth()
  142. }
  143. if b.connErr != nil {
  144. err = b.conn.Close()
  145. if err == nil {
  146. Logger.Printf("Closed connection to broker %s\n", b.addr)
  147. } else {
  148. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  149. }
  150. b.conn = nil
  151. atomic.StoreInt32(&b.opened, 0)
  152. return
  153. }
  154. }
  155. b.done = make(chan bool)
  156. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  157. if b.id >= 0 {
  158. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  159. } else {
  160. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  161. }
  162. go withRecover(b.responseReceiver)
  163. })
  164. return nil
  165. }
  166. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  167. // connected but it had tried to connect, the error from that connection attempt is also returned.
  168. func (b *Broker) Connected() (bool, error) {
  169. b.lock.Lock()
  170. defer b.lock.Unlock()
  171. return b.conn != nil, b.connErr
  172. }
  173. func (b *Broker) Close() error {
  174. b.lock.Lock()
  175. defer b.lock.Unlock()
  176. if b.conn == nil {
  177. return ErrNotConnected
  178. }
  179. close(b.responses)
  180. <-b.done
  181. err := b.conn.Close()
  182. b.conn = nil
  183. b.connErr = nil
  184. b.done = nil
  185. b.responses = nil
  186. if b.id >= 0 {
  187. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
  188. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
  189. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
  190. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
  191. }
  192. if err == nil {
  193. Logger.Printf("Closed connection to broker %s\n", b.addr)
  194. } else {
  195. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  196. }
  197. atomic.StoreInt32(&b.opened, 0)
  198. return err
  199. }
  200. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  201. func (b *Broker) ID() int32 {
  202. return b.id
  203. }
  204. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  205. func (b *Broker) Addr() string {
  206. return b.addr
  207. }
  208. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  209. // empty string if it is not known. The returned value corresponds to the
  210. // broker's broker.rack configuration setting. Requires protocol version to be
  211. // at least v0.10.0.0.
  212. func (b *Broker) Rack() string {
  213. if b.rack == nil {
  214. return ""
  215. }
  216. return *b.rack
  217. }
  218. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  219. response := new(MetadataResponse)
  220. err := b.sendAndReceive(request, response)
  221. if err != nil {
  222. return nil, err
  223. }
  224. return response, nil
  225. }
  226. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  227. response := new(ConsumerMetadataResponse)
  228. err := b.sendAndReceive(request, response)
  229. if err != nil {
  230. return nil, err
  231. }
  232. return response, nil
  233. }
  234. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  235. response := new(FindCoordinatorResponse)
  236. err := b.sendAndReceive(request, response)
  237. if err != nil {
  238. return nil, err
  239. }
  240. return response, nil
  241. }
  242. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  243. response := new(OffsetResponse)
  244. err := b.sendAndReceive(request, response)
  245. if err != nil {
  246. return nil, err
  247. }
  248. return response, nil
  249. }
  250. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  251. var response *ProduceResponse
  252. var err error
  253. if request.RequiredAcks == NoResponse {
  254. err = b.sendAndReceive(request, nil)
  255. } else {
  256. response = new(ProduceResponse)
  257. err = b.sendAndReceive(request, response)
  258. }
  259. if err != nil {
  260. return nil, err
  261. }
  262. return response, nil
  263. }
  264. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  265. response := new(FetchResponse)
  266. err := b.sendAndReceive(request, response)
  267. if err != nil {
  268. return nil, err
  269. }
  270. return response, nil
  271. }
  272. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  273. response := new(OffsetCommitResponse)
  274. err := b.sendAndReceive(request, response)
  275. if err != nil {
  276. return nil, err
  277. }
  278. return response, nil
  279. }
  280. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  281. response := new(OffsetFetchResponse)
  282. err := b.sendAndReceive(request, response)
  283. if err != nil {
  284. return nil, err
  285. }
  286. return response, nil
  287. }
  288. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  289. response := new(JoinGroupResponse)
  290. err := b.sendAndReceive(request, response)
  291. if err != nil {
  292. return nil, err
  293. }
  294. return response, nil
  295. }
  296. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  297. response := new(SyncGroupResponse)
  298. err := b.sendAndReceive(request, response)
  299. if err != nil {
  300. return nil, err
  301. }
  302. return response, nil
  303. }
  304. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  305. response := new(LeaveGroupResponse)
  306. err := b.sendAndReceive(request, response)
  307. if err != nil {
  308. return nil, err
  309. }
  310. return response, nil
  311. }
  312. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  313. response := new(HeartbeatResponse)
  314. err := b.sendAndReceive(request, response)
  315. if err != nil {
  316. return nil, err
  317. }
  318. return response, nil
  319. }
  320. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  321. response := new(ListGroupsResponse)
  322. err := b.sendAndReceive(request, response)
  323. if err != nil {
  324. return nil, err
  325. }
  326. return response, nil
  327. }
  328. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  329. response := new(DescribeGroupsResponse)
  330. err := b.sendAndReceive(request, response)
  331. if err != nil {
  332. return nil, err
  333. }
  334. return response, nil
  335. }
  336. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  337. response := new(ApiVersionsResponse)
  338. err := b.sendAndReceive(request, response)
  339. if err != nil {
  340. return nil, err
  341. }
  342. return response, nil
  343. }
  344. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  345. response := new(CreateTopicsResponse)
  346. err := b.sendAndReceive(request, response)
  347. if err != nil {
  348. return nil, err
  349. }
  350. return response, nil
  351. }
  352. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  353. response := new(DeleteTopicsResponse)
  354. err := b.sendAndReceive(request, response)
  355. if err != nil {
  356. return nil, err
  357. }
  358. return response, nil
  359. }
  360. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  361. response := new(CreatePartitionsResponse)
  362. err := b.sendAndReceive(request, response)
  363. if err != nil {
  364. return nil, err
  365. }
  366. return response, nil
  367. }
  368. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  369. response := new(DeleteRecordsResponse)
  370. err := b.sendAndReceive(request, response)
  371. if err != nil {
  372. return nil, err
  373. }
  374. return response, nil
  375. }
  376. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  377. response := new(DescribeAclsResponse)
  378. err := b.sendAndReceive(request, response)
  379. if err != nil {
  380. return nil, err
  381. }
  382. return response, nil
  383. }
  384. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  385. response := new(CreateAclsResponse)
  386. err := b.sendAndReceive(request, response)
  387. if err != nil {
  388. return nil, err
  389. }
  390. return response, nil
  391. }
  392. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  393. response := new(DeleteAclsResponse)
  394. err := b.sendAndReceive(request, response)
  395. if err != nil {
  396. return nil, err
  397. }
  398. return response, nil
  399. }
  400. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  401. response := new(InitProducerIDResponse)
  402. err := b.sendAndReceive(request, response)
  403. if err != nil {
  404. return nil, err
  405. }
  406. return response, nil
  407. }
  408. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  409. response := new(AddPartitionsToTxnResponse)
  410. err := b.sendAndReceive(request, response)
  411. if err != nil {
  412. return nil, err
  413. }
  414. return response, nil
  415. }
  416. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  417. response := new(AddOffsetsToTxnResponse)
  418. err := b.sendAndReceive(request, response)
  419. if err != nil {
  420. return nil, err
  421. }
  422. return response, nil
  423. }
  424. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  425. response := new(EndTxnResponse)
  426. err := b.sendAndReceive(request, response)
  427. if err != nil {
  428. return nil, err
  429. }
  430. return response, nil
  431. }
  432. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  433. response := new(TxnOffsetCommitResponse)
  434. err := b.sendAndReceive(request, response)
  435. if err != nil {
  436. return nil, err
  437. }
  438. return response, nil
  439. }
  440. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  441. response := new(DescribeConfigsResponse)
  442. err := b.sendAndReceive(request, response)
  443. if err != nil {
  444. return nil, err
  445. }
  446. return response, nil
  447. }
  448. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  449. response := new(AlterConfigsResponse)
  450. err := b.sendAndReceive(request, response)
  451. if err != nil {
  452. return nil, err
  453. }
  454. return response, nil
  455. }
  456. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  457. response := new(DeleteGroupsResponse)
  458. if err := b.sendAndReceive(request, response); err != nil {
  459. return nil, err
  460. }
  461. return response, nil
  462. }
  463. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  464. b.lock.Lock()
  465. defer b.lock.Unlock()
  466. if b.conn == nil {
  467. if b.connErr != nil {
  468. return nil, b.connErr
  469. }
  470. return nil, ErrNotConnected
  471. }
  472. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  473. return nil, ErrUnsupportedVersion
  474. }
  475. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  476. buf, err := encode(req, b.conf.MetricRegistry)
  477. if err != nil {
  478. return nil, err
  479. }
  480. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  481. if err != nil {
  482. return nil, err
  483. }
  484. requestTime := time.Now()
  485. bytes, err := b.conn.Write(buf)
  486. b.updateOutgoingCommunicationMetrics(bytes)
  487. if err != nil {
  488. return nil, err
  489. }
  490. b.correlationID++
  491. if !promiseResponse {
  492. // Record request latency without the response
  493. b.updateRequestLatencyMetrics(time.Since(requestTime))
  494. return nil, nil
  495. }
  496. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  497. b.responses <- promise
  498. return &promise, nil
  499. }
  500. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  501. promise, err := b.send(req, res != nil)
  502. if err != nil {
  503. return err
  504. }
  505. if promise == nil {
  506. return nil
  507. }
  508. select {
  509. case buf := <-promise.packets:
  510. return versionedDecode(buf, res, req.version())
  511. case err = <-promise.errors:
  512. return err
  513. }
  514. }
  515. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  516. b.id, err = pd.getInt32()
  517. if err != nil {
  518. return err
  519. }
  520. host, err := pd.getString()
  521. if err != nil {
  522. return err
  523. }
  524. port, err := pd.getInt32()
  525. if err != nil {
  526. return err
  527. }
  528. if version >= 1 {
  529. b.rack, err = pd.getNullableString()
  530. if err != nil {
  531. return err
  532. }
  533. }
  534. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  535. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  536. return err
  537. }
  538. return nil
  539. }
  540. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  541. host, portstr, err := net.SplitHostPort(b.addr)
  542. if err != nil {
  543. return err
  544. }
  545. port, err := strconv.Atoi(portstr)
  546. if err != nil {
  547. return err
  548. }
  549. pe.putInt32(b.id)
  550. err = pe.putString(host)
  551. if err != nil {
  552. return err
  553. }
  554. pe.putInt32(int32(port))
  555. if version >= 1 {
  556. err = pe.putNullableString(b.rack)
  557. if err != nil {
  558. return err
  559. }
  560. }
  561. return nil
  562. }
  563. func (b *Broker) responseReceiver() {
  564. var dead error
  565. header := make([]byte, 8)
  566. for response := range b.responses {
  567. if dead != nil {
  568. response.errors <- dead
  569. continue
  570. }
  571. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  572. if err != nil {
  573. dead = err
  574. response.errors <- err
  575. continue
  576. }
  577. bytesReadHeader, err := io.ReadFull(b.conn, header)
  578. requestLatency := time.Since(response.requestTime)
  579. if err != nil {
  580. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  581. dead = err
  582. response.errors <- err
  583. continue
  584. }
  585. decodedHeader := responseHeader{}
  586. err = decode(header, &decodedHeader)
  587. if err != nil {
  588. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  589. dead = err
  590. response.errors <- err
  591. continue
  592. }
  593. if decodedHeader.correlationID != response.correlationID {
  594. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  595. // TODO if decoded ID < cur ID, discard until we catch up
  596. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  597. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  598. response.errors <- dead
  599. continue
  600. }
  601. buf := make([]byte, decodedHeader.length-4)
  602. bytesReadBody, err := io.ReadFull(b.conn, buf)
  603. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  604. if err != nil {
  605. dead = err
  606. response.errors <- err
  607. continue
  608. }
  609. response.packets <- buf
  610. }
  611. close(b.done)
  612. }
  613. func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) error {
  614. rb := &SaslHandshakeRequest{Mechanism: saslType, Version: version}
  615. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  616. buf, err := encode(req, b.conf.MetricRegistry)
  617. if err != nil {
  618. return err
  619. }
  620. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  621. if err != nil {
  622. return err
  623. }
  624. requestTime := time.Now()
  625. bytes, err := b.conn.Write(buf)
  626. b.updateOutgoingCommunicationMetrics(bytes)
  627. if err != nil {
  628. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  629. return err
  630. }
  631. b.correlationID++
  632. //wait for the response
  633. header := make([]byte, 8) // response header
  634. _, err = io.ReadFull(b.conn, header)
  635. if err != nil {
  636. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  637. return err
  638. }
  639. length := binary.BigEndian.Uint32(header[:4])
  640. payload := make([]byte, length-4)
  641. n, err := io.ReadFull(b.conn, payload)
  642. if err != nil {
  643. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  644. return err
  645. }
  646. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  647. res := &SaslHandshakeResponse{}
  648. err = versionedDecode(payload, res, 0)
  649. if err != nil {
  650. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  651. return err
  652. }
  653. if res.Err != ErrNoError {
  654. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  655. return res.Err
  656. }
  657. Logger.Print("Successful SASL handshake")
  658. return nil
  659. }
  660. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  661. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  662. //
  663. // In SASL Plain, Kafka expects the auth header to be in the following format
  664. // Message format (from https://tools.ietf.org/html/rfc4616):
  665. //
  666. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  667. // authcid = 1*SAFE ; MUST accept up to 255 octets
  668. // authzid = 1*SAFE ; MUST accept up to 255 octets
  669. // passwd = 1*SAFE ; MUST accept up to 255 octets
  670. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  671. //
  672. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  673. // ;; any UTF-8 encoded Unicode character except NUL
  674. //
  675. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  676. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  677. // of responding to bad credentials but thats how its being done today.
  678. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  679. if b.conf.Net.SASL.Handshake {
  680. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, SASLHandshakeV0)
  681. if handshakeErr != nil {
  682. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  683. return handshakeErr
  684. }
  685. }
  686. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  687. authBytes := make([]byte, length+4) //4 byte length header + auth data
  688. binary.BigEndian.PutUint32(authBytes, uint32(length))
  689. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  690. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  691. if err != nil {
  692. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  693. return err
  694. }
  695. requestTime := time.Now()
  696. bytesWritten, err := b.conn.Write(authBytes)
  697. b.updateOutgoingCommunicationMetrics(bytesWritten)
  698. if err != nil {
  699. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  700. return err
  701. }
  702. header := make([]byte, 4)
  703. n, err := io.ReadFull(b.conn, header)
  704. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  705. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  706. // Otherwise, the broker closes the connection and we get an EOF
  707. if err != nil {
  708. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  709. return err
  710. }
  711. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  712. return nil
  713. }
  714. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  715. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  716. func (b *Broker) sendAndReceiveSASLOAuth(tokenProvider AccessTokenProvider) error {
  717. if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
  718. return err
  719. }
  720. token, err := tokenProvider.Token()
  721. if err != nil {
  722. return err
  723. }
  724. requestTime := time.Now()
  725. correlationID := b.correlationID
  726. bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
  727. if err != nil {
  728. return err
  729. }
  730. Logger.Printf("Correlation ID %d ", b.correlationID)
  731. b.updateOutgoingCommunicationMetrics(bytesWritten)
  732. b.correlationID++
  733. bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
  734. if err != nil {
  735. return err
  736. }
  737. requestLatency := time.Since(requestTime)
  738. b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  739. return nil
  740. }
  741. func (b *Broker) sendSASLOAuthBearerClientResponse(bearerToken string, correlationID int32) (int, error) {
  742. // Initial client response as described by RFC-7628
  743. // https://tools.ietf.org/html/rfc7628
  744. oauthRequest := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s\x01\x01", bearerToken))
  745. rb := &SaslAuthenticateRequest{oauthRequest}
  746. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  747. buf, err := encode(req, b.conf.MetricRegistry)
  748. if err != nil {
  749. return 0, err
  750. }
  751. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  752. return 0, err
  753. }
  754. bytesWritten, err := b.conn.Write(buf)
  755. if err != nil {
  756. return 0, err
  757. }
  758. return bytesWritten, nil
  759. }
  760. func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
  761. buf := make([]byte, 8)
  762. bytesRead, err := io.ReadFull(b.conn, buf)
  763. if err != nil {
  764. return bytesRead, err
  765. }
  766. header := responseHeader{}
  767. err = decode(buf, &header)
  768. if err != nil {
  769. return bytesRead, err
  770. }
  771. if header.correlationID != correlationID {
  772. return bytesRead, errors.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  773. }
  774. buf = make([]byte, header.length-4)
  775. c, err := io.ReadFull(b.conn, buf)
  776. bytesRead += c
  777. if err != nil {
  778. return bytesRead, err
  779. }
  780. res := &SaslAuthenticateResponse{}
  781. if err := versionedDecode(buf, res, 0); err != nil {
  782. return bytesRead, err
  783. }
  784. if res.Err != ErrNoError {
  785. return bytesRead, res.Err
  786. }
  787. return bytesRead, nil
  788. }
  789. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  790. b.updateRequestLatencyMetrics(requestLatency)
  791. b.responseRate.Mark(1)
  792. if b.brokerResponseRate != nil {
  793. b.brokerResponseRate.Mark(1)
  794. }
  795. responseSize := int64(bytes)
  796. b.incomingByteRate.Mark(responseSize)
  797. if b.brokerIncomingByteRate != nil {
  798. b.brokerIncomingByteRate.Mark(responseSize)
  799. }
  800. b.responseSize.Update(responseSize)
  801. if b.brokerResponseSize != nil {
  802. b.brokerResponseSize.Update(responseSize)
  803. }
  804. }
  805. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  806. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  807. b.requestLatency.Update(requestLatencyInMs)
  808. if b.brokerRequestLatency != nil {
  809. b.brokerRequestLatency.Update(requestLatencyInMs)
  810. }
  811. }
  812. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  813. b.requestRate.Mark(1)
  814. if b.brokerRequestRate != nil {
  815. b.brokerRequestRate.Mark(1)
  816. }
  817. requestSize := int64(bytes)
  818. b.outgoingByteRate.Mark(requestSize)
  819. if b.brokerOutgoingByteRate != nil {
  820. b.brokerOutgoingByteRate.Mark(requestSize)
  821. }
  822. b.requestSize.Update(requestSize)
  823. if b.brokerRequestSize != nil {
  824. b.brokerRequestSize.Update(requestSize)
  825. }
  826. }