broker.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/rcrowley/go-metrics"
  13. )
  14. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  15. type Broker struct {
  16. id int32
  17. addr string
  18. rack *string
  19. conf *Config
  20. correlationID int32
  21. conn net.Conn
  22. connErr error
  23. lock sync.Mutex
  24. opened int32
  25. responses chan responsePromise
  26. done chan bool
  27. incomingByteRate metrics.Meter
  28. requestRate metrics.Meter
  29. requestSize metrics.Histogram
  30. requestLatency metrics.Histogram
  31. outgoingByteRate metrics.Meter
  32. responseRate metrics.Meter
  33. responseSize metrics.Histogram
  34. brokerIncomingByteRate metrics.Meter
  35. brokerRequestRate metrics.Meter
  36. brokerRequestSize metrics.Histogram
  37. brokerRequestLatency metrics.Histogram
  38. brokerOutgoingByteRate metrics.Meter
  39. brokerResponseRate metrics.Meter
  40. brokerResponseSize metrics.Histogram
  41. }
  42. type responsePromise struct {
  43. requestTime time.Time
  44. correlationID int32
  45. packets chan []byte
  46. errors chan error
  47. }
  48. // NewBroker creates and returns a Broker targeting the given host:port address.
  49. // This does not attempt to actually connect, you have to call Open() for that.
  50. func NewBroker(addr string) *Broker {
  51. return &Broker{id: -1, addr: addr}
  52. }
  53. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  54. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  55. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  56. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  57. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  58. func (b *Broker) Open(conf *Config) error {
  59. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  60. return ErrAlreadyConnected
  61. }
  62. if conf == nil {
  63. conf = NewConfig()
  64. }
  65. err := conf.Validate()
  66. if err != nil {
  67. return err
  68. }
  69. b.lock.Lock()
  70. go withRecover(func() {
  71. defer b.lock.Unlock()
  72. dialer := net.Dialer{
  73. Timeout: conf.Net.DialTimeout,
  74. KeepAlive: conf.Net.KeepAlive,
  75. }
  76. if conf.Net.TLS.Enable {
  77. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  78. } else {
  79. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  80. }
  81. if b.connErr != nil {
  82. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  83. b.conn = nil
  84. atomic.StoreInt32(&b.opened, 0)
  85. return
  86. }
  87. b.conn = newBufConn(b.conn)
  88. b.conf = conf
  89. // Create or reuse the global metrics shared between brokers
  90. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  91. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  92. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  93. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  94. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  95. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  96. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  97. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  98. // the same id (-1) and are already exposed through the global metrics above
  99. if b.id >= 0 {
  100. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  101. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  102. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  103. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  104. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  105. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  106. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  107. }
  108. if conf.Net.SASL.Enable {
  109. b.connErr = b.sendAndReceiveSASLPlainAuth()
  110. if b.connErr != nil {
  111. err = b.conn.Close()
  112. if err == nil {
  113. Logger.Printf("Closed connection to broker %s\n", b.addr)
  114. } else {
  115. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  116. }
  117. b.conn = nil
  118. atomic.StoreInt32(&b.opened, 0)
  119. return
  120. }
  121. }
  122. b.done = make(chan bool)
  123. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  124. if b.id >= 0 {
  125. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  126. } else {
  127. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  128. }
  129. go withRecover(b.responseReceiver)
  130. })
  131. return nil
  132. }
  133. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  134. // connected but it had tried to connect, the error from that connection attempt is also returned.
  135. func (b *Broker) Connected() (bool, error) {
  136. b.lock.Lock()
  137. defer b.lock.Unlock()
  138. return b.conn != nil, b.connErr
  139. }
  140. func (b *Broker) Close() error {
  141. b.lock.Lock()
  142. defer b.lock.Unlock()
  143. if b.conn == nil {
  144. return ErrNotConnected
  145. }
  146. close(b.responses)
  147. <-b.done
  148. err := b.conn.Close()
  149. b.conn = nil
  150. b.connErr = nil
  151. b.done = nil
  152. b.responses = nil
  153. if b.id >= 0 {
  154. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
  155. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
  156. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
  157. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
  158. }
  159. if err == nil {
  160. Logger.Printf("Closed connection to broker %s\n", b.addr)
  161. } else {
  162. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  163. }
  164. atomic.StoreInt32(&b.opened, 0)
  165. return err
  166. }
  167. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  168. func (b *Broker) ID() int32 {
  169. return b.id
  170. }
  171. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  172. func (b *Broker) Addr() string {
  173. return b.addr
  174. }
  175. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  176. response := new(MetadataResponse)
  177. err := b.sendAndReceive(request, response)
  178. if err != nil {
  179. return nil, err
  180. }
  181. return response, nil
  182. }
  183. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  184. response := new(ConsumerMetadataResponse)
  185. err := b.sendAndReceive(request, response)
  186. if err != nil {
  187. return nil, err
  188. }
  189. return response, nil
  190. }
  191. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  192. response := new(FindCoordinatorResponse)
  193. err := b.sendAndReceive(request, response)
  194. if err != nil {
  195. return nil, err
  196. }
  197. return response, nil
  198. }
  199. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  200. response := new(OffsetResponse)
  201. err := b.sendAndReceive(request, response)
  202. if err != nil {
  203. return nil, err
  204. }
  205. return response, nil
  206. }
  207. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  208. var response *ProduceResponse
  209. var err error
  210. if request.RequiredAcks == NoResponse {
  211. err = b.sendAndReceive(request, nil)
  212. } else {
  213. response = new(ProduceResponse)
  214. err = b.sendAndReceive(request, response)
  215. }
  216. if err != nil {
  217. return nil, err
  218. }
  219. return response, nil
  220. }
  221. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  222. response := new(FetchResponse)
  223. err := b.sendAndReceive(request, response)
  224. if err != nil {
  225. return nil, err
  226. }
  227. return response, nil
  228. }
  229. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  230. response := new(OffsetCommitResponse)
  231. err := b.sendAndReceive(request, response)
  232. if err != nil {
  233. return nil, err
  234. }
  235. return response, nil
  236. }
  237. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  238. response := new(OffsetFetchResponse)
  239. err := b.sendAndReceive(request, response)
  240. if err != nil {
  241. return nil, err
  242. }
  243. return response, nil
  244. }
  245. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  246. response := new(JoinGroupResponse)
  247. err := b.sendAndReceive(request, response)
  248. if err != nil {
  249. return nil, err
  250. }
  251. return response, nil
  252. }
  253. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  254. response := new(SyncGroupResponse)
  255. err := b.sendAndReceive(request, response)
  256. if err != nil {
  257. return nil, err
  258. }
  259. return response, nil
  260. }
  261. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  262. response := new(LeaveGroupResponse)
  263. err := b.sendAndReceive(request, response)
  264. if err != nil {
  265. return nil, err
  266. }
  267. return response, nil
  268. }
  269. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  270. response := new(HeartbeatResponse)
  271. err := b.sendAndReceive(request, response)
  272. if err != nil {
  273. return nil, err
  274. }
  275. return response, nil
  276. }
  277. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  278. response := new(ListGroupsResponse)
  279. err := b.sendAndReceive(request, response)
  280. if err != nil {
  281. return nil, err
  282. }
  283. return response, nil
  284. }
  285. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  286. response := new(DescribeGroupsResponse)
  287. err := b.sendAndReceive(request, response)
  288. if err != nil {
  289. return nil, err
  290. }
  291. return response, nil
  292. }
  293. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  294. response := new(ApiVersionsResponse)
  295. err := b.sendAndReceive(request, response)
  296. if err != nil {
  297. return nil, err
  298. }
  299. return response, nil
  300. }
  301. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  302. response := new(CreatePartitionsResponse)
  303. err := b.sendAndReceive(request, response)
  304. if err != nil {
  305. return nil, err
  306. }
  307. return response, nil
  308. }
  309. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  310. response := new(CreateTopicsResponse)
  311. err := b.sendAndReceive(request, response)
  312. if err != nil {
  313. return nil, err
  314. }
  315. return response, nil
  316. }
  317. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  318. response := new(DeleteTopicsResponse)
  319. err := b.sendAndReceive(request, response)
  320. if err != nil {
  321. return nil, err
  322. }
  323. return response, nil
  324. }
  325. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  326. response := new(DeleteRecordsResponse)
  327. err := b.sendAndReceive(request, response)
  328. if err != nil {
  329. return nil, err
  330. }
  331. return response, nil
  332. }
  333. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  334. response := new(DescribeAclsResponse)
  335. err := b.sendAndReceive(request, response)
  336. if err != nil {
  337. return nil, err
  338. }
  339. return response, nil
  340. }
  341. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  342. response := new(CreateAclsResponse)
  343. err := b.sendAndReceive(request, response)
  344. if err != nil {
  345. return nil, err
  346. }
  347. return response, nil
  348. }
  349. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  350. response := new(DeleteAclsResponse)
  351. err := b.sendAndReceive(request, response)
  352. if err != nil {
  353. return nil, err
  354. }
  355. return response, nil
  356. }
  357. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  358. response := new(InitProducerIDResponse)
  359. err := b.sendAndReceive(request, response)
  360. if err != nil {
  361. return nil, err
  362. }
  363. return response, nil
  364. }
  365. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  366. response := new(AddPartitionsToTxnResponse)
  367. err := b.sendAndReceive(request, response)
  368. if err != nil {
  369. return nil, err
  370. }
  371. return response, nil
  372. }
  373. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  374. response := new(AddOffsetsToTxnResponse)
  375. err := b.sendAndReceive(request, response)
  376. if err != nil {
  377. return nil, err
  378. }
  379. return response, nil
  380. }
  381. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  382. response := new(EndTxnResponse)
  383. err := b.sendAndReceive(request, response)
  384. if err != nil {
  385. return nil, err
  386. }
  387. return response, nil
  388. }
  389. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  390. response := new(TxnOffsetCommitResponse)
  391. err := b.sendAndReceive(request, response)
  392. if err != nil {
  393. return nil, err
  394. }
  395. return response, nil
  396. }
  397. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  398. response := new(DescribeConfigsResponse)
  399. err := b.sendAndReceive(request, response)
  400. if err != nil {
  401. return nil, err
  402. }
  403. return response, nil
  404. }
  405. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  406. response := new(AlterConfigsResponse)
  407. err := b.sendAndReceive(request, response)
  408. if err != nil {
  409. return nil, err
  410. }
  411. return response, nil
  412. }
  413. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  414. b.lock.Lock()
  415. defer b.lock.Unlock()
  416. if b.conn == nil {
  417. if b.connErr != nil {
  418. return nil, b.connErr
  419. }
  420. return nil, ErrNotConnected
  421. }
  422. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  423. return nil, ErrUnsupportedVersion
  424. }
  425. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  426. buf, err := encode(req, b.conf.MetricRegistry)
  427. if err != nil {
  428. return nil, err
  429. }
  430. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  431. if err != nil {
  432. return nil, err
  433. }
  434. requestTime := time.Now()
  435. bytes, err := b.conn.Write(buf)
  436. b.updateOutgoingCommunicationMetrics(bytes)
  437. if err != nil {
  438. return nil, err
  439. }
  440. b.correlationID++
  441. if !promiseResponse {
  442. // Record request latency without the response
  443. b.updateRequestLatencyMetrics(time.Since(requestTime))
  444. return nil, nil
  445. }
  446. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  447. b.responses <- promise
  448. return &promise, nil
  449. }
  450. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  451. promise, err := b.send(req, res != nil)
  452. if err != nil {
  453. return err
  454. }
  455. if promise == nil {
  456. return nil
  457. }
  458. select {
  459. case buf := <-promise.packets:
  460. return versionedDecode(buf, res, req.version())
  461. case err = <-promise.errors:
  462. return err
  463. }
  464. }
  465. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  466. b.id, err = pd.getInt32()
  467. if err != nil {
  468. return err
  469. }
  470. host, err := pd.getString()
  471. if err != nil {
  472. return err
  473. }
  474. port, err := pd.getInt32()
  475. if err != nil {
  476. return err
  477. }
  478. if version >= 1 {
  479. b.rack, err = pd.getNullableString()
  480. if err != nil {
  481. return err
  482. }
  483. }
  484. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  485. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  486. return err
  487. }
  488. return nil
  489. }
  490. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  491. host, portstr, err := net.SplitHostPort(b.addr)
  492. if err != nil {
  493. return err
  494. }
  495. port, err := strconv.Atoi(portstr)
  496. if err != nil {
  497. return err
  498. }
  499. pe.putInt32(b.id)
  500. err = pe.putString(host)
  501. if err != nil {
  502. return err
  503. }
  504. pe.putInt32(int32(port))
  505. if version >= 1 {
  506. err = pe.putNullableString(b.rack)
  507. if err != nil {
  508. return err
  509. }
  510. }
  511. return nil
  512. }
  513. func (b *Broker) responseReceiver() {
  514. var dead error
  515. header := make([]byte, 8)
  516. for response := range b.responses {
  517. if dead != nil {
  518. response.errors <- dead
  519. continue
  520. }
  521. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  522. if err != nil {
  523. dead = err
  524. response.errors <- err
  525. continue
  526. }
  527. bytesReadHeader, err := io.ReadFull(b.conn, header)
  528. requestLatency := time.Since(response.requestTime)
  529. if err != nil {
  530. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  531. dead = err
  532. response.errors <- err
  533. continue
  534. }
  535. decodedHeader := responseHeader{}
  536. err = decode(header, &decodedHeader)
  537. if err != nil {
  538. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  539. dead = err
  540. response.errors <- err
  541. continue
  542. }
  543. if decodedHeader.correlationID != response.correlationID {
  544. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  545. // TODO if decoded ID < cur ID, discard until we catch up
  546. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  547. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  548. response.errors <- dead
  549. continue
  550. }
  551. buf := make([]byte, decodedHeader.length-4)
  552. bytesReadBody, err := io.ReadFull(b.conn, buf)
  553. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  554. if err != nil {
  555. dead = err
  556. response.errors <- err
  557. continue
  558. }
  559. response.packets <- buf
  560. }
  561. close(b.done)
  562. }
  563. func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
  564. rb := &SaslHandshakeRequest{"PLAIN"}
  565. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  566. buf, err := encode(req, b.conf.MetricRegistry)
  567. if err != nil {
  568. return err
  569. }
  570. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  571. if err != nil {
  572. return err
  573. }
  574. requestTime := time.Now()
  575. bytes, err := b.conn.Write(buf)
  576. b.updateOutgoingCommunicationMetrics(bytes)
  577. if err != nil {
  578. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  579. return err
  580. }
  581. b.correlationID++
  582. //wait for the response
  583. header := make([]byte, 8) // response header
  584. _, err = io.ReadFull(b.conn, header)
  585. if err != nil {
  586. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  587. return err
  588. }
  589. length := binary.BigEndian.Uint32(header[:4])
  590. payload := make([]byte, length-4)
  591. n, err := io.ReadFull(b.conn, payload)
  592. if err != nil {
  593. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  594. return err
  595. }
  596. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  597. res := &SaslHandshakeResponse{}
  598. err = versionedDecode(payload, res, 0)
  599. if err != nil {
  600. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  601. return err
  602. }
  603. if res.Err != ErrNoError {
  604. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  605. return res.Err
  606. }
  607. Logger.Print("Successful SASL handshake")
  608. return nil
  609. }
  610. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  611. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  612. //
  613. // In SASL Plain, Kafka expects the auth header to be in the following format
  614. // Message format (from https://tools.ietf.org/html/rfc4616):
  615. //
  616. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  617. // authcid = 1*SAFE ; MUST accept up to 255 octets
  618. // authzid = 1*SAFE ; MUST accept up to 255 octets
  619. // passwd = 1*SAFE ; MUST accept up to 255 octets
  620. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  621. //
  622. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  623. // ;; any UTF-8 encoded Unicode character except NUL
  624. //
  625. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  626. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  627. // of responding to bad credentials but thats how its being done today.
  628. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  629. if b.conf.Net.SASL.Handshake {
  630. handshakeErr := b.sendAndReceiveSASLPlainHandshake()
  631. if handshakeErr != nil {
  632. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  633. return handshakeErr
  634. }
  635. }
  636. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  637. authBytes := make([]byte, length+4) //4 byte length header + auth data
  638. binary.BigEndian.PutUint32(authBytes, uint32(length))
  639. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  640. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  641. if err != nil {
  642. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  643. return err
  644. }
  645. requestTime := time.Now()
  646. bytesWritten, err := b.conn.Write(authBytes)
  647. b.updateOutgoingCommunicationMetrics(bytesWritten)
  648. if err != nil {
  649. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  650. return err
  651. }
  652. header := make([]byte, 4)
  653. n, err := io.ReadFull(b.conn, header)
  654. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  655. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  656. // Otherwise, the broker closes the connection and we get an EOF
  657. if err != nil {
  658. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  659. return err
  660. }
  661. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  662. return nil
  663. }
  664. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  665. b.updateRequestLatencyMetrics(requestLatency)
  666. b.responseRate.Mark(1)
  667. if b.brokerResponseRate != nil {
  668. b.brokerResponseRate.Mark(1)
  669. }
  670. responseSize := int64(bytes)
  671. b.incomingByteRate.Mark(responseSize)
  672. if b.brokerIncomingByteRate != nil {
  673. b.brokerIncomingByteRate.Mark(responseSize)
  674. }
  675. b.responseSize.Update(responseSize)
  676. if b.brokerResponseSize != nil {
  677. b.brokerResponseSize.Update(responseSize)
  678. }
  679. }
  680. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  681. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  682. b.requestLatency.Update(requestLatencyInMs)
  683. if b.brokerRequestLatency != nil {
  684. b.brokerRequestLatency.Update(requestLatencyInMs)
  685. }
  686. }
  687. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  688. b.requestRate.Mark(1)
  689. if b.brokerRequestRate != nil {
  690. b.brokerRequestRate.Mark(1)
  691. }
  692. requestSize := int64(bytes)
  693. b.outgoingByteRate.Mark(requestSize)
  694. if b.brokerOutgoingByteRate != nil {
  695. b.brokerOutgoingByteRate.Mark(requestSize)
  696. }
  697. b.requestSize.Update(requestSize)
  698. if b.brokerRequestSize != nil {
  699. b.brokerRequestSize.Update(requestSize)
  700. }
  701. }