broker.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/rcrowley/go-metrics"
  13. )
  14. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  15. type Broker struct {
  16. id int32
  17. addr string
  18. rack *string
  19. conf *Config
  20. correlationID int32
  21. conn net.Conn
  22. connErr error
  23. lock sync.Mutex
  24. opened int32
  25. responses chan responsePromise
  26. done chan bool
  27. incomingByteRate metrics.Meter
  28. requestRate metrics.Meter
  29. requestSize metrics.Histogram
  30. requestLatency metrics.Histogram
  31. outgoingByteRate metrics.Meter
  32. responseRate metrics.Meter
  33. responseSize metrics.Histogram
  34. brokerIncomingByteRate metrics.Meter
  35. brokerRequestRate metrics.Meter
  36. brokerRequestSize metrics.Histogram
  37. brokerRequestLatency metrics.Histogram
  38. brokerOutgoingByteRate metrics.Meter
  39. brokerResponseRate metrics.Meter
  40. brokerResponseSize metrics.Histogram
  41. }
  42. type responsePromise struct {
  43. requestTime time.Time
  44. correlationID int32
  45. packets chan []byte
  46. errors chan error
  47. }
  48. // NewBroker creates and returns a Broker targeting the given host:port address.
  49. // This does not attempt to actually connect, you have to call Open() for that.
  50. func NewBroker(addr string) *Broker {
  51. return &Broker{id: -1, addr: addr}
  52. }
  53. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  54. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  55. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  56. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  57. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  58. func (b *Broker) Open(conf *Config) error {
  59. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  60. return ErrAlreadyConnected
  61. }
  62. if conf == nil {
  63. conf = NewConfig()
  64. }
  65. err := conf.Validate()
  66. if err != nil {
  67. return err
  68. }
  69. b.lock.Lock()
  70. go withRecover(func() {
  71. defer b.lock.Unlock()
  72. dialer := net.Dialer{
  73. Timeout: conf.Net.DialTimeout,
  74. KeepAlive: conf.Net.KeepAlive,
  75. LocalAddr: conf.Net.LocalAddr,
  76. }
  77. if conf.Net.TLS.Enable {
  78. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  79. } else {
  80. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  81. }
  82. if b.connErr != nil {
  83. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  84. b.conn = nil
  85. atomic.StoreInt32(&b.opened, 0)
  86. return
  87. }
  88. b.conn = newBufConn(b.conn)
  89. b.conf = conf
  90. // Create or reuse the global metrics shared between brokers
  91. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  92. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  93. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  94. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  95. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  96. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  97. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  98. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  99. // the same id (-1) and are already exposed through the global metrics above
  100. if b.id >= 0 {
  101. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  102. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  103. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  104. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  105. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  106. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  107. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  108. }
  109. if conf.Net.SASL.Enable {
  110. b.connErr = b.sendAndReceiveSASLPlainAuth()
  111. if b.connErr != nil {
  112. err = b.conn.Close()
  113. if err == nil {
  114. Logger.Printf("Closed connection to broker %s\n", b.addr)
  115. } else {
  116. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  117. }
  118. b.conn = nil
  119. atomic.StoreInt32(&b.opened, 0)
  120. return
  121. }
  122. }
  123. b.done = make(chan bool)
  124. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  125. if b.id >= 0 {
  126. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  127. } else {
  128. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  129. }
  130. go withRecover(b.responseReceiver)
  131. })
  132. return nil
  133. }
  134. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  135. // connected but it had tried to connect, the error from that connection attempt is also returned.
  136. func (b *Broker) Connected() (bool, error) {
  137. b.lock.Lock()
  138. defer b.lock.Unlock()
  139. return b.conn != nil, b.connErr
  140. }
  141. func (b *Broker) Close() error {
  142. b.lock.Lock()
  143. defer b.lock.Unlock()
  144. if b.conn == nil {
  145. return ErrNotConnected
  146. }
  147. close(b.responses)
  148. <-b.done
  149. err := b.conn.Close()
  150. b.conn = nil
  151. b.connErr = nil
  152. b.done = nil
  153. b.responses = nil
  154. if b.id >= 0 {
  155. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
  156. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
  157. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
  158. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
  159. }
  160. if err == nil {
  161. Logger.Printf("Closed connection to broker %s\n", b.addr)
  162. } else {
  163. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  164. }
  165. atomic.StoreInt32(&b.opened, 0)
  166. return err
  167. }
  168. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  169. func (b *Broker) ID() int32 {
  170. return b.id
  171. }
  172. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  173. func (b *Broker) Addr() string {
  174. return b.addr
  175. }
  176. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  177. // empty string if it is not known. The returned value corresponds to the
  178. // broker's broker.rack configuration setting. Requires protocol version to be
  179. // at least v0.10.0.0.
  180. func (b *Broker) Rack() string {
  181. if b.rack == nil {
  182. return ""
  183. }
  184. return *b.rack
  185. }
  186. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  187. response := new(MetadataResponse)
  188. err := b.sendAndReceive(request, response)
  189. if err != nil {
  190. return nil, err
  191. }
  192. return response, nil
  193. }
  194. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  195. response := new(ConsumerMetadataResponse)
  196. err := b.sendAndReceive(request, response)
  197. if err != nil {
  198. return nil, err
  199. }
  200. return response, nil
  201. }
  202. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  203. response := new(FindCoordinatorResponse)
  204. err := b.sendAndReceive(request, response)
  205. if err != nil {
  206. return nil, err
  207. }
  208. return response, nil
  209. }
  210. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  211. response := new(OffsetResponse)
  212. err := b.sendAndReceive(request, response)
  213. if err != nil {
  214. return nil, err
  215. }
  216. return response, nil
  217. }
  218. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  219. var response *ProduceResponse
  220. var err error
  221. if request.RequiredAcks == NoResponse {
  222. err = b.sendAndReceive(request, nil)
  223. } else {
  224. response = new(ProduceResponse)
  225. err = b.sendAndReceive(request, response)
  226. }
  227. if err != nil {
  228. return nil, err
  229. }
  230. return response, nil
  231. }
  232. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  233. response := new(FetchResponse)
  234. err := b.sendAndReceive(request, response)
  235. if err != nil {
  236. return nil, err
  237. }
  238. return response, nil
  239. }
  240. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  241. response := new(OffsetCommitResponse)
  242. err := b.sendAndReceive(request, response)
  243. if err != nil {
  244. return nil, err
  245. }
  246. return response, nil
  247. }
  248. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  249. response := new(OffsetFetchResponse)
  250. err := b.sendAndReceive(request, response)
  251. if err != nil {
  252. return nil, err
  253. }
  254. return response, nil
  255. }
  256. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  257. response := new(JoinGroupResponse)
  258. err := b.sendAndReceive(request, response)
  259. if err != nil {
  260. return nil, err
  261. }
  262. return response, nil
  263. }
  264. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  265. response := new(SyncGroupResponse)
  266. err := b.sendAndReceive(request, response)
  267. if err != nil {
  268. return nil, err
  269. }
  270. return response, nil
  271. }
  272. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  273. response := new(LeaveGroupResponse)
  274. err := b.sendAndReceive(request, response)
  275. if err != nil {
  276. return nil, err
  277. }
  278. return response, nil
  279. }
  280. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  281. response := new(HeartbeatResponse)
  282. err := b.sendAndReceive(request, response)
  283. if err != nil {
  284. return nil, err
  285. }
  286. return response, nil
  287. }
  288. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  289. response := new(ListGroupsResponse)
  290. err := b.sendAndReceive(request, response)
  291. if err != nil {
  292. return nil, err
  293. }
  294. return response, nil
  295. }
  296. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  297. response := new(DescribeGroupsResponse)
  298. err := b.sendAndReceive(request, response)
  299. if err != nil {
  300. return nil, err
  301. }
  302. return response, nil
  303. }
  304. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  305. response := new(ApiVersionsResponse)
  306. err := b.sendAndReceive(request, response)
  307. if err != nil {
  308. return nil, err
  309. }
  310. return response, nil
  311. }
  312. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  313. response := new(CreateTopicsResponse)
  314. err := b.sendAndReceive(request, response)
  315. if err != nil {
  316. return nil, err
  317. }
  318. return response, nil
  319. }
  320. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  321. response := new(DeleteTopicsResponse)
  322. err := b.sendAndReceive(request, response)
  323. if err != nil {
  324. return nil, err
  325. }
  326. return response, nil
  327. }
  328. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  329. response := new(CreatePartitionsResponse)
  330. err := b.sendAndReceive(request, response)
  331. if err != nil {
  332. return nil, err
  333. }
  334. return response, nil
  335. }
  336. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  337. response := new(DeleteRecordsResponse)
  338. err := b.sendAndReceive(request, response)
  339. if err != nil {
  340. return nil, err
  341. }
  342. return response, nil
  343. }
  344. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  345. response := new(DescribeAclsResponse)
  346. err := b.sendAndReceive(request, response)
  347. if err != nil {
  348. return nil, err
  349. }
  350. return response, nil
  351. }
  352. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  353. response := new(CreateAclsResponse)
  354. err := b.sendAndReceive(request, response)
  355. if err != nil {
  356. return nil, err
  357. }
  358. return response, nil
  359. }
  360. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  361. response := new(DeleteAclsResponse)
  362. err := b.sendAndReceive(request, response)
  363. if err != nil {
  364. return nil, err
  365. }
  366. return response, nil
  367. }
  368. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  369. response := new(InitProducerIDResponse)
  370. err := b.sendAndReceive(request, response)
  371. if err != nil {
  372. return nil, err
  373. }
  374. return response, nil
  375. }
  376. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  377. response := new(AddPartitionsToTxnResponse)
  378. err := b.sendAndReceive(request, response)
  379. if err != nil {
  380. return nil, err
  381. }
  382. return response, nil
  383. }
  384. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  385. response := new(AddOffsetsToTxnResponse)
  386. err := b.sendAndReceive(request, response)
  387. if err != nil {
  388. return nil, err
  389. }
  390. return response, nil
  391. }
  392. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  393. response := new(EndTxnResponse)
  394. err := b.sendAndReceive(request, response)
  395. if err != nil {
  396. return nil, err
  397. }
  398. return response, nil
  399. }
  400. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  401. response := new(TxnOffsetCommitResponse)
  402. err := b.sendAndReceive(request, response)
  403. if err != nil {
  404. return nil, err
  405. }
  406. return response, nil
  407. }
  408. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  409. response := new(DescribeConfigsResponse)
  410. err := b.sendAndReceive(request, response)
  411. if err != nil {
  412. return nil, err
  413. }
  414. return response, nil
  415. }
  416. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  417. response := new(AlterConfigsResponse)
  418. err := b.sendAndReceive(request, response)
  419. if err != nil {
  420. return nil, err
  421. }
  422. return response, nil
  423. }
  424. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  425. response := new(DeleteGroupsResponse)
  426. if err := b.sendAndReceive(request, response); err != nil {
  427. return nil, err
  428. }
  429. return response, nil
  430. }
  431. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  432. b.lock.Lock()
  433. defer b.lock.Unlock()
  434. if b.conn == nil {
  435. if b.connErr != nil {
  436. return nil, b.connErr
  437. }
  438. return nil, ErrNotConnected
  439. }
  440. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  441. return nil, ErrUnsupportedVersion
  442. }
  443. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  444. buf, err := encode(req, b.conf.MetricRegistry)
  445. if err != nil {
  446. return nil, err
  447. }
  448. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  449. if err != nil {
  450. return nil, err
  451. }
  452. requestTime := time.Now()
  453. bytes, err := b.conn.Write(buf)
  454. b.updateOutgoingCommunicationMetrics(bytes)
  455. if err != nil {
  456. return nil, err
  457. }
  458. b.correlationID++
  459. if !promiseResponse {
  460. // Record request latency without the response
  461. b.updateRequestLatencyMetrics(time.Since(requestTime))
  462. return nil, nil
  463. }
  464. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  465. b.responses <- promise
  466. return &promise, nil
  467. }
  468. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  469. promise, err := b.send(req, res != nil)
  470. if err != nil {
  471. return err
  472. }
  473. if promise == nil {
  474. return nil
  475. }
  476. select {
  477. case buf := <-promise.packets:
  478. return versionedDecode(buf, res, req.version())
  479. case err = <-promise.errors:
  480. return err
  481. }
  482. }
  483. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  484. b.id, err = pd.getInt32()
  485. if err != nil {
  486. return err
  487. }
  488. host, err := pd.getString()
  489. if err != nil {
  490. return err
  491. }
  492. port, err := pd.getInt32()
  493. if err != nil {
  494. return err
  495. }
  496. if version >= 1 {
  497. b.rack, err = pd.getNullableString()
  498. if err != nil {
  499. return err
  500. }
  501. }
  502. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  503. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  504. return err
  505. }
  506. return nil
  507. }
  508. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  509. host, portstr, err := net.SplitHostPort(b.addr)
  510. if err != nil {
  511. return err
  512. }
  513. port, err := strconv.Atoi(portstr)
  514. if err != nil {
  515. return err
  516. }
  517. pe.putInt32(b.id)
  518. err = pe.putString(host)
  519. if err != nil {
  520. return err
  521. }
  522. pe.putInt32(int32(port))
  523. if version >= 1 {
  524. err = pe.putNullableString(b.rack)
  525. if err != nil {
  526. return err
  527. }
  528. }
  529. return nil
  530. }
  531. func (b *Broker) responseReceiver() {
  532. var dead error
  533. header := make([]byte, 8)
  534. for response := range b.responses {
  535. if dead != nil {
  536. response.errors <- dead
  537. continue
  538. }
  539. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  540. if err != nil {
  541. dead = err
  542. response.errors <- err
  543. continue
  544. }
  545. bytesReadHeader, err := io.ReadFull(b.conn, header)
  546. requestLatency := time.Since(response.requestTime)
  547. if err != nil {
  548. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  549. dead = err
  550. response.errors <- err
  551. continue
  552. }
  553. decodedHeader := responseHeader{}
  554. err = decode(header, &decodedHeader)
  555. if err != nil {
  556. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  557. dead = err
  558. response.errors <- err
  559. continue
  560. }
  561. if decodedHeader.correlationID != response.correlationID {
  562. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  563. // TODO if decoded ID < cur ID, discard until we catch up
  564. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  565. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  566. response.errors <- dead
  567. continue
  568. }
  569. buf := make([]byte, decodedHeader.length-4)
  570. bytesReadBody, err := io.ReadFull(b.conn, buf)
  571. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  572. if err != nil {
  573. dead = err
  574. response.errors <- err
  575. continue
  576. }
  577. response.packets <- buf
  578. }
  579. close(b.done)
  580. }
  581. func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
  582. rb := &SaslHandshakeRequest{"PLAIN"}
  583. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  584. buf, err := encode(req, b.conf.MetricRegistry)
  585. if err != nil {
  586. return err
  587. }
  588. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  589. if err != nil {
  590. return err
  591. }
  592. requestTime := time.Now()
  593. bytes, err := b.conn.Write(buf)
  594. b.updateOutgoingCommunicationMetrics(bytes)
  595. if err != nil {
  596. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  597. return err
  598. }
  599. b.correlationID++
  600. //wait for the response
  601. header := make([]byte, 8) // response header
  602. _, err = io.ReadFull(b.conn, header)
  603. if err != nil {
  604. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  605. return err
  606. }
  607. length := binary.BigEndian.Uint32(header[:4])
  608. payload := make([]byte, length-4)
  609. n, err := io.ReadFull(b.conn, payload)
  610. if err != nil {
  611. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  612. return err
  613. }
  614. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  615. res := &SaslHandshakeResponse{}
  616. err = versionedDecode(payload, res, 0)
  617. if err != nil {
  618. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  619. return err
  620. }
  621. if res.Err != ErrNoError {
  622. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  623. return res.Err
  624. }
  625. Logger.Print("Successful SASL handshake")
  626. return nil
  627. }
  628. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  629. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  630. //
  631. // In SASL Plain, Kafka expects the auth header to be in the following format
  632. // Message format (from https://tools.ietf.org/html/rfc4616):
  633. //
  634. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  635. // authcid = 1*SAFE ; MUST accept up to 255 octets
  636. // authzid = 1*SAFE ; MUST accept up to 255 octets
  637. // passwd = 1*SAFE ; MUST accept up to 255 octets
  638. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  639. //
  640. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  641. // ;; any UTF-8 encoded Unicode character except NUL
  642. //
  643. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  644. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  645. // of responding to bad credentials but thats how its being done today.
  646. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  647. if b.conf.Net.SASL.Handshake {
  648. handshakeErr := b.sendAndReceiveSASLPlainHandshake()
  649. if handshakeErr != nil {
  650. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  651. return handshakeErr
  652. }
  653. }
  654. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  655. authBytes := make([]byte, length+4) //4 byte length header + auth data
  656. binary.BigEndian.PutUint32(authBytes, uint32(length))
  657. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  658. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  659. if err != nil {
  660. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  661. return err
  662. }
  663. requestTime := time.Now()
  664. bytesWritten, err := b.conn.Write(authBytes)
  665. b.updateOutgoingCommunicationMetrics(bytesWritten)
  666. if err != nil {
  667. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  668. return err
  669. }
  670. header := make([]byte, 4)
  671. n, err := io.ReadFull(b.conn, header)
  672. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  673. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  674. // Otherwise, the broker closes the connection and we get an EOF
  675. if err != nil {
  676. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  677. return err
  678. }
  679. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  680. return nil
  681. }
  682. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  683. b.updateRequestLatencyMetrics(requestLatency)
  684. b.responseRate.Mark(1)
  685. if b.brokerResponseRate != nil {
  686. b.brokerResponseRate.Mark(1)
  687. }
  688. responseSize := int64(bytes)
  689. b.incomingByteRate.Mark(responseSize)
  690. if b.brokerIncomingByteRate != nil {
  691. b.brokerIncomingByteRate.Mark(responseSize)
  692. }
  693. b.responseSize.Update(responseSize)
  694. if b.brokerResponseSize != nil {
  695. b.brokerResponseSize.Update(responseSize)
  696. }
  697. }
  698. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  699. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  700. b.requestLatency.Update(requestLatencyInMs)
  701. if b.brokerRequestLatency != nil {
  702. b.brokerRequestLatency.Update(requestLatencyInMs)
  703. }
  704. }
  705. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  706. b.requestRate.Mark(1)
  707. if b.brokerRequestRate != nil {
  708. b.brokerRequestRate.Mark(1)
  709. }
  710. requestSize := int64(bytes)
  711. b.outgoingByteRate.Mark(requestSize)
  712. if b.brokerOutgoingByteRate != nil {
  713. b.brokerOutgoingByteRate.Mark(requestSize)
  714. }
  715. b.requestSize.Update(requestSize)
  716. if b.brokerRequestSize != nil {
  717. b.brokerRequestSize.Update(requestSize)
  718. }
  719. }