broker.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/rcrowley/go-metrics"
  13. )
  14. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  15. type Broker struct {
  16. id int32
  17. addr string
  18. conf *Config
  19. correlationID int32
  20. conn net.Conn
  21. connErr error
  22. lock sync.Mutex
  23. opened int32
  24. responses chan responsePromise
  25. done chan bool
  26. incomingByteRate metrics.Meter
  27. requestRate metrics.Meter
  28. requestSize metrics.Histogram
  29. requestLatency metrics.Histogram
  30. outgoingByteRate metrics.Meter
  31. responseRate metrics.Meter
  32. responseSize metrics.Histogram
  33. brokerIncomingByteRate metrics.Meter
  34. brokerRequestRate metrics.Meter
  35. brokerRequestSize metrics.Histogram
  36. brokerRequestLatency metrics.Histogram
  37. brokerOutgoingByteRate metrics.Meter
  38. brokerResponseRate metrics.Meter
  39. brokerResponseSize metrics.Histogram
  40. }
  41. type responsePromise struct {
  42. requestTime time.Time
  43. correlationID int32
  44. packets chan []byte
  45. errors chan error
  46. }
  47. // NewBroker creates and returns a Broker targeting the given host:port address.
  48. // This does not attempt to actually connect, you have to call Open() for that.
  49. func NewBroker(addr string) *Broker {
  50. return &Broker{id: -1, addr: addr}
  51. }
  52. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  53. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  54. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  55. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  56. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  57. func (b *Broker) Open(conf *Config) error {
  58. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  59. return ErrAlreadyConnected
  60. }
  61. if conf == nil {
  62. conf = NewConfig()
  63. }
  64. err := conf.Validate()
  65. if err != nil {
  66. return err
  67. }
  68. b.lock.Lock()
  69. go withRecover(func() {
  70. defer b.lock.Unlock()
  71. dialer := net.Dialer{
  72. Timeout: conf.Net.DialTimeout,
  73. KeepAlive: conf.Net.KeepAlive,
  74. }
  75. if conf.Net.TLS.Enable {
  76. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  77. } else {
  78. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  79. }
  80. if b.connErr != nil {
  81. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  82. b.conn = nil
  83. atomic.StoreInt32(&b.opened, 0)
  84. return
  85. }
  86. b.conn = newBufConn(b.conn)
  87. b.conf = conf
  88. // Create or reuse the global metrics shared between brokers
  89. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  90. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  91. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  92. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  93. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  94. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  95. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  96. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  97. // the same id (-1) and are already exposed through the global metrics above
  98. if b.id >= 0 {
  99. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  100. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  101. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  102. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  103. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  104. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  105. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  106. }
  107. if conf.Net.SASL.Enable {
  108. b.connErr = b.sendAndReceiveSASLPlainAuth()
  109. if b.connErr != nil {
  110. err = b.conn.Close()
  111. if err == nil {
  112. Logger.Printf("Closed connection to broker %s\n", b.addr)
  113. } else {
  114. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  115. }
  116. b.conn = nil
  117. atomic.StoreInt32(&b.opened, 0)
  118. return
  119. }
  120. }
  121. b.done = make(chan bool)
  122. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  123. if b.id >= 0 {
  124. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  125. } else {
  126. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  127. }
  128. go withRecover(b.responseReceiver)
  129. })
  130. return nil
  131. }
  132. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  133. // connected but it had tried to connect, the error from that connection attempt is also returned.
  134. func (b *Broker) Connected() (bool, error) {
  135. b.lock.Lock()
  136. defer b.lock.Unlock()
  137. return b.conn != nil, b.connErr
  138. }
  139. func (b *Broker) Close() error {
  140. b.lock.Lock()
  141. defer b.lock.Unlock()
  142. if b.conn == nil {
  143. return ErrNotConnected
  144. }
  145. close(b.responses)
  146. <-b.done
  147. err := b.conn.Close()
  148. b.conn = nil
  149. b.connErr = nil
  150. b.done = nil
  151. b.responses = nil
  152. if b.id >= 0 {
  153. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
  154. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
  155. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
  156. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
  157. }
  158. if err == nil {
  159. Logger.Printf("Closed connection to broker %s\n", b.addr)
  160. } else {
  161. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  162. }
  163. atomic.StoreInt32(&b.opened, 0)
  164. return err
  165. }
  166. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  167. func (b *Broker) ID() int32 {
  168. return b.id
  169. }
  170. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  171. func (b *Broker) Addr() string {
  172. return b.addr
  173. }
  174. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  175. response := new(MetadataResponse)
  176. err := b.sendAndReceive(request, response)
  177. if err != nil {
  178. return nil, err
  179. }
  180. return response, nil
  181. }
  182. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  183. response := new(ConsumerMetadataResponse)
  184. err := b.sendAndReceive(request, response)
  185. if err != nil {
  186. return nil, err
  187. }
  188. return response, nil
  189. }
  190. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  191. response := new(OffsetResponse)
  192. err := b.sendAndReceive(request, response)
  193. if err != nil {
  194. return nil, err
  195. }
  196. return response, nil
  197. }
  198. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  199. var response *ProduceResponse
  200. var err error
  201. if request.RequiredAcks == NoResponse {
  202. err = b.sendAndReceive(request, nil)
  203. } else {
  204. response = new(ProduceResponse)
  205. err = b.sendAndReceive(request, response)
  206. }
  207. if err != nil {
  208. return nil, err
  209. }
  210. return response, nil
  211. }
  212. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  213. response := new(FetchResponse)
  214. err := b.sendAndReceive(request, response)
  215. if err != nil {
  216. return nil, err
  217. }
  218. return response, nil
  219. }
  220. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  221. response := new(OffsetCommitResponse)
  222. err := b.sendAndReceive(request, response)
  223. if err != nil {
  224. return nil, err
  225. }
  226. return response, nil
  227. }
  228. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  229. response := new(OffsetFetchResponse)
  230. err := b.sendAndReceive(request, response)
  231. if err != nil {
  232. return nil, err
  233. }
  234. return response, nil
  235. }
  236. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  237. response := new(JoinGroupResponse)
  238. err := b.sendAndReceive(request, response)
  239. if err != nil {
  240. return nil, err
  241. }
  242. return response, nil
  243. }
  244. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  245. response := new(SyncGroupResponse)
  246. err := b.sendAndReceive(request, response)
  247. if err != nil {
  248. return nil, err
  249. }
  250. return response, nil
  251. }
  252. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  253. response := new(LeaveGroupResponse)
  254. err := b.sendAndReceive(request, response)
  255. if err != nil {
  256. return nil, err
  257. }
  258. return response, nil
  259. }
  260. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  261. response := new(HeartbeatResponse)
  262. err := b.sendAndReceive(request, response)
  263. if err != nil {
  264. return nil, err
  265. }
  266. return response, nil
  267. }
  268. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  269. response := new(ListGroupsResponse)
  270. err := b.sendAndReceive(request, response)
  271. if err != nil {
  272. return nil, err
  273. }
  274. return response, nil
  275. }
  276. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  277. response := new(DescribeGroupsResponse)
  278. err := b.sendAndReceive(request, response)
  279. if err != nil {
  280. return nil, err
  281. }
  282. return response, nil
  283. }
  284. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  285. response := new(ApiVersionsResponse)
  286. err := b.sendAndReceive(request, response)
  287. if err != nil {
  288. return nil, err
  289. }
  290. return response, nil
  291. }
  292. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  293. response := new(CreateTopicsResponse)
  294. err := b.sendAndReceive(request, response)
  295. if err != nil {
  296. return nil, err
  297. }
  298. return response, nil
  299. }
  300. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  301. response := new(DeleteTopicsResponse)
  302. err := b.sendAndReceive(request, response)
  303. if err != nil {
  304. return nil, err
  305. }
  306. return response, nil
  307. }
  308. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  309. response := new(DescribeAclsResponse)
  310. err := b.sendAndReceive(request, response)
  311. if err != nil {
  312. return nil, err
  313. }
  314. return response, nil
  315. }
  316. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  317. response := new(CreateAclsResponse)
  318. err := b.sendAndReceive(request, response)
  319. if err != nil {
  320. return nil, err
  321. }
  322. return response, nil
  323. }
  324. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  325. response := new(DeleteAclsResponse)
  326. err := b.sendAndReceive(request, response)
  327. if err != nil {
  328. return nil, err
  329. }
  330. return response, nil
  331. }
  332. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  333. response := new(InitProducerIDResponse)
  334. err := b.sendAndReceive(request, response)
  335. if err != nil {
  336. return nil, err
  337. }
  338. return response, nil
  339. }
  340. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  341. response := new(AddPartitionsToTxnResponse)
  342. err := b.sendAndReceive(request, response)
  343. if err != nil {
  344. return nil, err
  345. }
  346. return response, nil
  347. }
  348. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  349. response := new(AddOffsetsToTxnResponse)
  350. err := b.sendAndReceive(request, response)
  351. if err != nil {
  352. return nil, err
  353. }
  354. return response, nil
  355. }
  356. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  357. response := new(EndTxnResponse)
  358. err := b.sendAndReceive(request, response)
  359. if err != nil {
  360. return nil, err
  361. }
  362. return response, nil
  363. }
  364. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  365. response := new(TxnOffsetCommitResponse)
  366. err := b.sendAndReceive(request, response)
  367. if err != nil {
  368. return nil, err
  369. }
  370. return response, nil
  371. }
  372. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  373. response := new(DescribeConfigsResponse)
  374. err := b.sendAndReceive(request, response)
  375. if err != nil {
  376. return nil, err
  377. }
  378. return response, nil
  379. }
  380. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  381. response := new(AlterConfigsResponse)
  382. err := b.sendAndReceive(request, response)
  383. if err != nil {
  384. return nil, err
  385. }
  386. return response, nil
  387. }
  388. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  389. b.lock.Lock()
  390. defer b.lock.Unlock()
  391. if b.conn == nil {
  392. if b.connErr != nil {
  393. return nil, b.connErr
  394. }
  395. return nil, ErrNotConnected
  396. }
  397. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  398. return nil, ErrUnsupportedVersion
  399. }
  400. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  401. buf, err := encode(req, b.conf.MetricRegistry)
  402. if err != nil {
  403. return nil, err
  404. }
  405. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  406. if err != nil {
  407. return nil, err
  408. }
  409. requestTime := time.Now()
  410. bytes, err := b.conn.Write(buf)
  411. b.updateOutgoingCommunicationMetrics(bytes)
  412. if err != nil {
  413. return nil, err
  414. }
  415. b.correlationID++
  416. if !promiseResponse {
  417. // Record request latency without the response
  418. b.updateRequestLatencyMetrics(time.Since(requestTime))
  419. return nil, nil
  420. }
  421. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  422. b.responses <- promise
  423. return &promise, nil
  424. }
  425. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  426. promise, err := b.send(req, res != nil)
  427. if err != nil {
  428. return err
  429. }
  430. if promise == nil {
  431. return nil
  432. }
  433. select {
  434. case buf := <-promise.packets:
  435. return versionedDecode(buf, res, req.version())
  436. case err = <-promise.errors:
  437. return err
  438. }
  439. }
  440. func (b *Broker) decode(pd packetDecoder) (err error) {
  441. b.id, err = pd.getInt32()
  442. if err != nil {
  443. return err
  444. }
  445. host, err := pd.getString()
  446. if err != nil {
  447. return err
  448. }
  449. port, err := pd.getInt32()
  450. if err != nil {
  451. return err
  452. }
  453. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  454. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  455. return err
  456. }
  457. return nil
  458. }
  459. func (b *Broker) encode(pe packetEncoder) (err error) {
  460. host, portstr, err := net.SplitHostPort(b.addr)
  461. if err != nil {
  462. return err
  463. }
  464. port, err := strconv.Atoi(portstr)
  465. if err != nil {
  466. return err
  467. }
  468. pe.putInt32(b.id)
  469. err = pe.putString(host)
  470. if err != nil {
  471. return err
  472. }
  473. pe.putInt32(int32(port))
  474. return nil
  475. }
  476. func (b *Broker) responseReceiver() {
  477. var dead error
  478. header := make([]byte, 8)
  479. for response := range b.responses {
  480. if dead != nil {
  481. response.errors <- dead
  482. continue
  483. }
  484. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  485. if err != nil {
  486. dead = err
  487. response.errors <- err
  488. continue
  489. }
  490. bytesReadHeader, err := io.ReadFull(b.conn, header)
  491. requestLatency := time.Since(response.requestTime)
  492. if err != nil {
  493. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  494. dead = err
  495. response.errors <- err
  496. continue
  497. }
  498. decodedHeader := responseHeader{}
  499. err = decode(header, &decodedHeader)
  500. if err != nil {
  501. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  502. dead = err
  503. response.errors <- err
  504. continue
  505. }
  506. if decodedHeader.correlationID != response.correlationID {
  507. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  508. // TODO if decoded ID < cur ID, discard until we catch up
  509. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  510. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  511. response.errors <- dead
  512. continue
  513. }
  514. buf := make([]byte, decodedHeader.length-4)
  515. bytesReadBody, err := io.ReadFull(b.conn, buf)
  516. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  517. if err != nil {
  518. dead = err
  519. response.errors <- err
  520. continue
  521. }
  522. response.packets <- buf
  523. }
  524. close(b.done)
  525. }
  526. func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
  527. rb := &SaslHandshakeRequest{"PLAIN"}
  528. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  529. buf, err := encode(req, b.conf.MetricRegistry)
  530. if err != nil {
  531. return err
  532. }
  533. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  534. if err != nil {
  535. return err
  536. }
  537. requestTime := time.Now()
  538. bytes, err := b.conn.Write(buf)
  539. b.updateOutgoingCommunicationMetrics(bytes)
  540. if err != nil {
  541. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  542. return err
  543. }
  544. b.correlationID++
  545. //wait for the response
  546. header := make([]byte, 8) // response header
  547. _, err = io.ReadFull(b.conn, header)
  548. if err != nil {
  549. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  550. return err
  551. }
  552. length := binary.BigEndian.Uint32(header[:4])
  553. payload := make([]byte, length-4)
  554. n, err := io.ReadFull(b.conn, payload)
  555. if err != nil {
  556. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  557. return err
  558. }
  559. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  560. res := &SaslHandshakeResponse{}
  561. err = versionedDecode(payload, res, 0)
  562. if err != nil {
  563. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  564. return err
  565. }
  566. if res.Err != ErrNoError {
  567. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  568. return res.Err
  569. }
  570. Logger.Print("Successful SASL handshake")
  571. return nil
  572. }
  573. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  574. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  575. //
  576. // In SASL Plain, Kafka expects the auth header to be in the following format
  577. // Message format (from https://tools.ietf.org/html/rfc4616):
  578. //
  579. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  580. // authcid = 1*SAFE ; MUST accept up to 255 octets
  581. // authzid = 1*SAFE ; MUST accept up to 255 octets
  582. // passwd = 1*SAFE ; MUST accept up to 255 octets
  583. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  584. //
  585. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  586. // ;; any UTF-8 encoded Unicode character except NUL
  587. //
  588. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  589. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  590. // of responding to bad credentials but thats how its being done today.
  591. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  592. if b.conf.Net.SASL.Handshake {
  593. handshakeErr := b.sendAndReceiveSASLPlainHandshake()
  594. if handshakeErr != nil {
  595. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  596. return handshakeErr
  597. }
  598. }
  599. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  600. authBytes := make([]byte, length+4) //4 byte length header + auth data
  601. binary.BigEndian.PutUint32(authBytes, uint32(length))
  602. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  603. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  604. if err != nil {
  605. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  606. return err
  607. }
  608. requestTime := time.Now()
  609. bytesWritten, err := b.conn.Write(authBytes)
  610. b.updateOutgoingCommunicationMetrics(bytesWritten)
  611. if err != nil {
  612. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  613. return err
  614. }
  615. header := make([]byte, 4)
  616. n, err := io.ReadFull(b.conn, header)
  617. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  618. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  619. // Otherwise, the broker closes the connection and we get an EOF
  620. if err != nil {
  621. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  622. return err
  623. }
  624. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  625. return nil
  626. }
  627. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  628. b.updateRequestLatencyMetrics(requestLatency)
  629. b.responseRate.Mark(1)
  630. if b.brokerResponseRate != nil {
  631. b.brokerResponseRate.Mark(1)
  632. }
  633. responseSize := int64(bytes)
  634. b.incomingByteRate.Mark(responseSize)
  635. if b.brokerIncomingByteRate != nil {
  636. b.brokerIncomingByteRate.Mark(responseSize)
  637. }
  638. b.responseSize.Update(responseSize)
  639. if b.brokerResponseSize != nil {
  640. b.brokerResponseSize.Update(responseSize)
  641. }
  642. }
  643. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  644. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  645. b.requestLatency.Update(requestLatencyInMs)
  646. if b.brokerRequestLatency != nil {
  647. b.brokerRequestLatency.Update(requestLatencyInMs)
  648. }
  649. }
  650. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  651. b.requestRate.Mark(1)
  652. if b.brokerRequestRate != nil {
  653. b.brokerRequestRate.Mark(1)
  654. }
  655. requestSize := int64(bytes)
  656. b.outgoingByteRate.Mark(requestSize)
  657. if b.brokerOutgoingByteRate != nil {
  658. b.brokerOutgoingByteRate.Mark(requestSize)
  659. }
  660. b.requestSize.Update(requestSize)
  661. if b.brokerRequestSize != nil {
  662. b.brokerRequestSize.Update(requestSize)
  663. }
  664. }