broker.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/rcrowley/go-metrics"
  13. )
  14. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  15. type Broker struct {
  16. id int32
  17. addr string
  18. rack *string
  19. conf *Config
  20. correlationID int32
  21. conn net.Conn
  22. connErr error
  23. lock sync.Mutex
  24. opened int32
  25. responses chan responsePromise
  26. done chan bool
  27. incomingByteRate metrics.Meter
  28. requestRate metrics.Meter
  29. requestSize metrics.Histogram
  30. requestLatency metrics.Histogram
  31. outgoingByteRate metrics.Meter
  32. responseRate metrics.Meter
  33. responseSize metrics.Histogram
  34. brokerIncomingByteRate metrics.Meter
  35. brokerRequestRate metrics.Meter
  36. brokerRequestSize metrics.Histogram
  37. brokerRequestLatency metrics.Histogram
  38. brokerOutgoingByteRate metrics.Meter
  39. brokerResponseRate metrics.Meter
  40. brokerResponseSize metrics.Histogram
  41. }
  42. type responsePromise struct {
  43. requestTime time.Time
  44. correlationID int32
  45. packets chan []byte
  46. errors chan error
  47. }
  48. // NewBroker creates and returns a Broker targeting the given host:port address.
  49. // This does not attempt to actually connect, you have to call Open() for that.
  50. func NewBroker(addr string) *Broker {
  51. return &Broker{id: -1, addr: addr}
  52. }
  53. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  54. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  55. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  56. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  57. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  58. func (b *Broker) Open(conf *Config) error {
  59. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  60. return ErrAlreadyConnected
  61. }
  62. if conf == nil {
  63. conf = NewConfig()
  64. }
  65. err := conf.Validate()
  66. if err != nil {
  67. return err
  68. }
  69. b.lock.Lock()
  70. go withRecover(func() {
  71. defer b.lock.Unlock()
  72. dialer := net.Dialer{
  73. Timeout: conf.Net.DialTimeout,
  74. KeepAlive: conf.Net.KeepAlive,
  75. }
  76. if conf.Net.TLS.Enable {
  77. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  78. } else {
  79. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  80. }
  81. if b.connErr != nil {
  82. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  83. b.conn = nil
  84. atomic.StoreInt32(&b.opened, 0)
  85. return
  86. }
  87. b.conn = newBufConn(b.conn)
  88. b.conf = conf
  89. // Create or reuse the global metrics shared between brokers
  90. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  91. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  92. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  93. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  94. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  95. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  96. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  97. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  98. // the same id (-1) and are already exposed through the global metrics above
  99. if b.id >= 0 {
  100. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  101. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  102. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  103. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  104. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  105. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  106. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  107. }
  108. if conf.Net.SASL.Enable {
  109. b.connErr = b.sendAndReceiveSASLPlainAuth()
  110. if b.connErr != nil {
  111. err = b.conn.Close()
  112. if err == nil {
  113. Logger.Printf("Closed connection to broker %s\n", b.addr)
  114. } else {
  115. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  116. }
  117. b.conn = nil
  118. atomic.StoreInt32(&b.opened, 0)
  119. return
  120. }
  121. }
  122. b.done = make(chan bool)
  123. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  124. if b.id >= 0 {
  125. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  126. } else {
  127. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  128. }
  129. go withRecover(b.responseReceiver)
  130. })
  131. return nil
  132. }
  133. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  134. // connected but it had tried to connect, the error from that connection attempt is also returned.
  135. func (b *Broker) Connected() (bool, error) {
  136. b.lock.Lock()
  137. defer b.lock.Unlock()
  138. return b.conn != nil, b.connErr
  139. }
  140. func (b *Broker) Close() error {
  141. b.lock.Lock()
  142. defer b.lock.Unlock()
  143. if b.conn == nil {
  144. return ErrNotConnected
  145. }
  146. close(b.responses)
  147. <-b.done
  148. err := b.conn.Close()
  149. b.conn = nil
  150. b.connErr = nil
  151. b.done = nil
  152. b.responses = nil
  153. if b.id >= 0 {
  154. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
  155. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
  156. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
  157. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
  158. }
  159. if err == nil {
  160. Logger.Printf("Closed connection to broker %s\n", b.addr)
  161. } else {
  162. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  163. }
  164. atomic.StoreInt32(&b.opened, 0)
  165. return err
  166. }
  167. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  168. func (b *Broker) ID() int32 {
  169. return b.id
  170. }
  171. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  172. func (b *Broker) Addr() string {
  173. return b.addr
  174. }
  175. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  176. response := new(MetadataResponse)
  177. err := b.sendAndReceive(request, response)
  178. if err != nil {
  179. return nil, err
  180. }
  181. return response, nil
  182. }
  183. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  184. response := new(ConsumerMetadataResponse)
  185. err := b.sendAndReceive(request, response)
  186. if err != nil {
  187. return nil, err
  188. }
  189. return response, nil
  190. }
  191. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  192. response := new(FindCoordinatorResponse)
  193. err := b.sendAndReceive(request, response)
  194. if err != nil {
  195. return nil, err
  196. }
  197. return response, nil
  198. }
  199. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  200. response := new(OffsetResponse)
  201. err := b.sendAndReceive(request, response)
  202. if err != nil {
  203. return nil, err
  204. }
  205. return response, nil
  206. }
  207. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  208. var response *ProduceResponse
  209. var err error
  210. if request.RequiredAcks == NoResponse {
  211. err = b.sendAndReceive(request, nil)
  212. } else {
  213. response = new(ProduceResponse)
  214. err = b.sendAndReceive(request, response)
  215. }
  216. if err != nil {
  217. return nil, err
  218. }
  219. return response, nil
  220. }
  221. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  222. response := new(FetchResponse)
  223. err := b.sendAndReceive(request, response)
  224. if err != nil {
  225. return nil, err
  226. }
  227. return response, nil
  228. }
  229. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  230. response := new(OffsetCommitResponse)
  231. err := b.sendAndReceive(request, response)
  232. if err != nil {
  233. return nil, err
  234. }
  235. return response, nil
  236. }
  237. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  238. response := new(OffsetFetchResponse)
  239. err := b.sendAndReceive(request, response)
  240. if err != nil {
  241. return nil, err
  242. }
  243. return response, nil
  244. }
  245. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  246. response := new(JoinGroupResponse)
  247. err := b.sendAndReceive(request, response)
  248. if err != nil {
  249. return nil, err
  250. }
  251. return response, nil
  252. }
  253. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  254. response := new(SyncGroupResponse)
  255. err := b.sendAndReceive(request, response)
  256. if err != nil {
  257. return nil, err
  258. }
  259. return response, nil
  260. }
  261. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  262. response := new(LeaveGroupResponse)
  263. err := b.sendAndReceive(request, response)
  264. if err != nil {
  265. return nil, err
  266. }
  267. return response, nil
  268. }
  269. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  270. response := new(HeartbeatResponse)
  271. err := b.sendAndReceive(request, response)
  272. if err != nil {
  273. return nil, err
  274. }
  275. return response, nil
  276. }
  277. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  278. response := new(ListGroupsResponse)
  279. err := b.sendAndReceive(request, response)
  280. if err != nil {
  281. return nil, err
  282. }
  283. return response, nil
  284. }
  285. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  286. response := new(DescribeGroupsResponse)
  287. err := b.sendAndReceive(request, response)
  288. if err != nil {
  289. return nil, err
  290. }
  291. return response, nil
  292. }
  293. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  294. response := new(ApiVersionsResponse)
  295. err := b.sendAndReceive(request, response)
  296. if err != nil {
  297. return nil, err
  298. }
  299. return response, nil
  300. }
  301. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  302. response := new(CreatePartitionsResponse)
  303. err := b.sendAndReceive(request, response)
  304. if err != nil {
  305. return nil, err
  306. }
  307. return response, nil
  308. }
  309. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  310. response := new(CreateTopicsResponse)
  311. err := b.sendAndReceive(request, response)
  312. if err != nil {
  313. return nil, err
  314. }
  315. return response, nil
  316. }
  317. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  318. response := new(DeleteTopicsResponse)
  319. err := b.sendAndReceive(request, response)
  320. if err != nil {
  321. return nil, err
  322. }
  323. return response, nil
  324. }
  325. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  326. response := new(DeleteRecordsResponse)
  327. err := b.sendAndReceive(request, response)
  328. if err != nil {
  329. return nil, err
  330. }
  331. return response, nil
  332. }
  333. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  334. response := new(DescribeAclsResponse)
  335. err := b.sendAndReceive(request, response)
  336. if err != nil {
  337. return nil, err
  338. }
  339. return response, nil
  340. }
  341. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  342. response := new(CreateAclsResponse)
  343. err := b.sendAndReceive(request, response)
  344. if err != nil {
  345. return nil, err
  346. }
  347. return response, nil
  348. }
  349. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  350. response := new(DeleteAclsResponse)
  351. err := b.sendAndReceive(request, response)
  352. if err != nil {
  353. return nil, err
  354. }
  355. return response, nil
  356. }
  357. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  358. response := new(InitProducerIDResponse)
  359. err := b.sendAndReceive(request, response)
  360. if err != nil {
  361. return nil, err
  362. }
  363. return response, nil
  364. }
  365. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  366. response := new(AddPartitionsToTxnResponse)
  367. err := b.sendAndReceive(request, response)
  368. if err != nil {
  369. return nil, err
  370. }
  371. return response, nil
  372. }
  373. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  374. response := new(AddOffsetsToTxnResponse)
  375. err := b.sendAndReceive(request, response)
  376. if err != nil {
  377. return nil, err
  378. }
  379. return response, nil
  380. }
  381. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  382. response := new(EndTxnResponse)
  383. err := b.sendAndReceive(request, response)
  384. if err != nil {
  385. return nil, err
  386. }
  387. return response, nil
  388. }
  389. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  390. response := new(TxnOffsetCommitResponse)
  391. err := b.sendAndReceive(request, response)
  392. if err != nil {
  393. return nil, err
  394. }
  395. return response, nil
  396. }
  397. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  398. response := new(DescribeConfigsResponse)
  399. err := b.sendAndReceive(request, response)
  400. if err != nil {
  401. return nil, err
  402. }
  403. return response, nil
  404. }
  405. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  406. response := new(AlterConfigsResponse)
  407. err := b.sendAndReceive(request, response)
  408. if err != nil {
  409. return nil, err
  410. }
  411. return response, nil
  412. }
  413. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  414. response := new(DeleteGroupsResponse)
  415. if err := b.sendAndReceive(request, response); err != nil {
  416. return nil, err
  417. }
  418. return response, nil
  419. }
  420. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  421. b.lock.Lock()
  422. defer b.lock.Unlock()
  423. if b.conn == nil {
  424. if b.connErr != nil {
  425. return nil, b.connErr
  426. }
  427. return nil, ErrNotConnected
  428. }
  429. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  430. return nil, ErrUnsupportedVersion
  431. }
  432. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  433. buf, err := encode(req, b.conf.MetricRegistry)
  434. if err != nil {
  435. return nil, err
  436. }
  437. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  438. if err != nil {
  439. return nil, err
  440. }
  441. requestTime := time.Now()
  442. bytes, err := b.conn.Write(buf)
  443. b.updateOutgoingCommunicationMetrics(bytes)
  444. if err != nil {
  445. return nil, err
  446. }
  447. b.correlationID++
  448. if !promiseResponse {
  449. // Record request latency without the response
  450. b.updateRequestLatencyMetrics(time.Since(requestTime))
  451. return nil, nil
  452. }
  453. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  454. b.responses <- promise
  455. return &promise, nil
  456. }
  457. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  458. promise, err := b.send(req, res != nil)
  459. if err != nil {
  460. return err
  461. }
  462. if promise == nil {
  463. return nil
  464. }
  465. select {
  466. case buf := <-promise.packets:
  467. return versionedDecode(buf, res, req.version())
  468. case err = <-promise.errors:
  469. return err
  470. }
  471. }
  472. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  473. b.id, err = pd.getInt32()
  474. if err != nil {
  475. return err
  476. }
  477. host, err := pd.getString()
  478. if err != nil {
  479. return err
  480. }
  481. port, err := pd.getInt32()
  482. if err != nil {
  483. return err
  484. }
  485. if version >= 1 {
  486. b.rack, err = pd.getNullableString()
  487. if err != nil {
  488. return err
  489. }
  490. }
  491. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  492. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  493. return err
  494. }
  495. return nil
  496. }
  497. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  498. host, portstr, err := net.SplitHostPort(b.addr)
  499. if err != nil {
  500. return err
  501. }
  502. port, err := strconv.Atoi(portstr)
  503. if err != nil {
  504. return err
  505. }
  506. pe.putInt32(b.id)
  507. err = pe.putString(host)
  508. if err != nil {
  509. return err
  510. }
  511. pe.putInt32(int32(port))
  512. if version >= 1 {
  513. err = pe.putNullableString(b.rack)
  514. if err != nil {
  515. return err
  516. }
  517. }
  518. return nil
  519. }
  520. func (b *Broker) responseReceiver() {
  521. var dead error
  522. header := make([]byte, 8)
  523. for response := range b.responses {
  524. if dead != nil {
  525. response.errors <- dead
  526. continue
  527. }
  528. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  529. if err != nil {
  530. dead = err
  531. response.errors <- err
  532. continue
  533. }
  534. bytesReadHeader, err := io.ReadFull(b.conn, header)
  535. requestLatency := time.Since(response.requestTime)
  536. if err != nil {
  537. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  538. dead = err
  539. response.errors <- err
  540. continue
  541. }
  542. decodedHeader := responseHeader{}
  543. err = decode(header, &decodedHeader)
  544. if err != nil {
  545. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  546. dead = err
  547. response.errors <- err
  548. continue
  549. }
  550. if decodedHeader.correlationID != response.correlationID {
  551. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  552. // TODO if decoded ID < cur ID, discard until we catch up
  553. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  554. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  555. response.errors <- dead
  556. continue
  557. }
  558. buf := make([]byte, decodedHeader.length-4)
  559. bytesReadBody, err := io.ReadFull(b.conn, buf)
  560. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  561. if err != nil {
  562. dead = err
  563. response.errors <- err
  564. continue
  565. }
  566. response.packets <- buf
  567. }
  568. close(b.done)
  569. }
  570. func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
  571. rb := &SaslHandshakeRequest{"PLAIN"}
  572. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  573. buf, err := encode(req, b.conf.MetricRegistry)
  574. if err != nil {
  575. return err
  576. }
  577. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  578. if err != nil {
  579. return err
  580. }
  581. requestTime := time.Now()
  582. bytes, err := b.conn.Write(buf)
  583. b.updateOutgoingCommunicationMetrics(bytes)
  584. if err != nil {
  585. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  586. return err
  587. }
  588. b.correlationID++
  589. //wait for the response
  590. header := make([]byte, 8) // response header
  591. _, err = io.ReadFull(b.conn, header)
  592. if err != nil {
  593. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  594. return err
  595. }
  596. length := binary.BigEndian.Uint32(header[:4])
  597. payload := make([]byte, length-4)
  598. n, err := io.ReadFull(b.conn, payload)
  599. if err != nil {
  600. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  601. return err
  602. }
  603. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  604. res := &SaslHandshakeResponse{}
  605. err = versionedDecode(payload, res, 0)
  606. if err != nil {
  607. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  608. return err
  609. }
  610. if res.Err != ErrNoError {
  611. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  612. return res.Err
  613. }
  614. Logger.Print("Successful SASL handshake")
  615. return nil
  616. }
  617. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  618. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  619. //
  620. // In SASL Plain, Kafka expects the auth header to be in the following format
  621. // Message format (from https://tools.ietf.org/html/rfc4616):
  622. //
  623. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  624. // authcid = 1*SAFE ; MUST accept up to 255 octets
  625. // authzid = 1*SAFE ; MUST accept up to 255 octets
  626. // passwd = 1*SAFE ; MUST accept up to 255 octets
  627. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  628. //
  629. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  630. // ;; any UTF-8 encoded Unicode character except NUL
  631. //
  632. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  633. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  634. // of responding to bad credentials but thats how its being done today.
  635. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  636. if b.conf.Net.SASL.Handshake {
  637. handshakeErr := b.sendAndReceiveSASLPlainHandshake()
  638. if handshakeErr != nil {
  639. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  640. return handshakeErr
  641. }
  642. }
  643. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  644. authBytes := make([]byte, length+4) //4 byte length header + auth data
  645. binary.BigEndian.PutUint32(authBytes, uint32(length))
  646. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  647. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  648. if err != nil {
  649. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  650. return err
  651. }
  652. requestTime := time.Now()
  653. bytesWritten, err := b.conn.Write(authBytes)
  654. b.updateOutgoingCommunicationMetrics(bytesWritten)
  655. if err != nil {
  656. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  657. return err
  658. }
  659. header := make([]byte, 4)
  660. n, err := io.ReadFull(b.conn, header)
  661. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  662. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  663. // Otherwise, the broker closes the connection and we get an EOF
  664. if err != nil {
  665. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  666. return err
  667. }
  668. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  669. return nil
  670. }
  671. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  672. b.updateRequestLatencyMetrics(requestLatency)
  673. b.responseRate.Mark(1)
  674. if b.brokerResponseRate != nil {
  675. b.brokerResponseRate.Mark(1)
  676. }
  677. responseSize := int64(bytes)
  678. b.incomingByteRate.Mark(responseSize)
  679. if b.brokerIncomingByteRate != nil {
  680. b.brokerIncomingByteRate.Mark(responseSize)
  681. }
  682. b.responseSize.Update(responseSize)
  683. if b.brokerResponseSize != nil {
  684. b.brokerResponseSize.Update(responseSize)
  685. }
  686. }
  687. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  688. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  689. b.requestLatency.Update(requestLatencyInMs)
  690. if b.brokerRequestLatency != nil {
  691. b.brokerRequestLatency.Update(requestLatencyInMs)
  692. }
  693. }
  694. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  695. b.requestRate.Mark(1)
  696. if b.brokerRequestRate != nil {
  697. b.brokerRequestRate.Mark(1)
  698. }
  699. requestSize := int64(bytes)
  700. b.outgoingByteRate.Mark(requestSize)
  701. if b.brokerOutgoingByteRate != nil {
  702. b.brokerOutgoingByteRate.Mark(requestSize)
  703. }
  704. b.requestSize.Update(requestSize)
  705. if b.brokerRequestSize != nil {
  706. b.brokerRequestSize.Update(requestSize)
  707. }
  708. }