broker.go 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/rcrowley/go-metrics"
  15. )
  16. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  17. type Broker struct {
  18. id int32
  19. addr string
  20. rack *string
  21. conf *Config
  22. correlationID int32
  23. conn net.Conn
  24. connErr error
  25. lock sync.Mutex
  26. opened int32
  27. responses chan responsePromise
  28. done chan bool
  29. incomingByteRate metrics.Meter
  30. requestRate metrics.Meter
  31. requestSize metrics.Histogram
  32. requestLatency metrics.Histogram
  33. outgoingByteRate metrics.Meter
  34. responseRate metrics.Meter
  35. responseSize metrics.Histogram
  36. brokerIncomingByteRate metrics.Meter
  37. brokerRequestRate metrics.Meter
  38. brokerRequestSize metrics.Histogram
  39. brokerRequestLatency metrics.Histogram
  40. brokerOutgoingByteRate metrics.Meter
  41. brokerResponseRate metrics.Meter
  42. brokerResponseSize metrics.Histogram
  43. }
  44. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  45. type SASLMechanism string
  46. const (
  47. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  48. SASLTypeOAuth = "OAUTHBEARER"
  49. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  50. SASLTypePlaintext = "PLAIN"
  51. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  52. // server negotiate SASL auth using opaque packets.
  53. SASLHandshakeV0 = int16(0)
  54. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  55. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  56. SASLHandshakeV1 = int16(1)
  57. // SASLExtKeyAuth is the reserved extension key name sent as part of the
  58. // SASL/OAUTHBEARER intial client response
  59. SASLExtKeyAuth = "auth"
  60. )
  61. // AccessToken contains an access token used to authenticate a
  62. // SASL/OAUTHBEARER client along with associated metadata.
  63. type AccessToken struct {
  64. // Token is the access token payload.
  65. Token string
  66. // Extensions is a optional map of arbitrary key-value pairs that can be
  67. // sent with the SASL/OAUTHBEARER initial client response. These values are
  68. // ignored by the SASL server if they are unexpected. This feature is only
  69. // supported by Kafka >= 2.1.0.
  70. Extensions map[string]string
  71. }
  72. // AccessTokenProvider is the interface that encapsulates how implementors
  73. // can generate access tokens for Kafka broker authentication.
  74. type AccessTokenProvider interface {
  75. // Token returns an access token. The implementation should ensure token
  76. // reuse so that multiple calls at connect time do not create multiple
  77. // tokens. The implementation should also periodically refresh the token in
  78. // order to guarantee that each call returns an unexpired token. This
  79. // method should not block indefinitely--a timeout error should be returned
  80. // after a short period of inactivity so that the broker connection logic
  81. // can log debugging information and retry.
  82. Token() (*AccessToken, error)
  83. }
  84. type responsePromise struct {
  85. requestTime time.Time
  86. correlationID int32
  87. packets chan []byte
  88. errors chan error
  89. }
  90. // NewBroker creates and returns a Broker targeting the given host:port address.
  91. // This does not attempt to actually connect, you have to call Open() for that.
  92. func NewBroker(addr string) *Broker {
  93. return &Broker{id: -1, addr: addr}
  94. }
  95. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  96. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  97. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  98. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  99. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  100. func (b *Broker) Open(conf *Config) error {
  101. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  102. return ErrAlreadyConnected
  103. }
  104. if conf == nil {
  105. conf = NewConfig()
  106. }
  107. err := conf.Validate()
  108. if err != nil {
  109. return err
  110. }
  111. b.lock.Lock()
  112. go withRecover(func() {
  113. defer b.lock.Unlock()
  114. dialer := net.Dialer{
  115. Timeout: conf.Net.DialTimeout,
  116. KeepAlive: conf.Net.KeepAlive,
  117. LocalAddr: conf.Net.LocalAddr,
  118. }
  119. if conf.Net.TLS.Enable {
  120. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  121. } else {
  122. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  123. }
  124. if b.connErr != nil {
  125. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  126. b.conn = nil
  127. atomic.StoreInt32(&b.opened, 0)
  128. return
  129. }
  130. b.conn = newBufConn(b.conn)
  131. b.conf = conf
  132. // Create or reuse the global metrics shared between brokers
  133. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  134. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  135. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  136. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  137. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  138. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  139. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  140. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  141. // the same id (-1) and are already exposed through the global metrics above
  142. if b.id >= 0 {
  143. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  144. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  145. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  146. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  147. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  148. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  149. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  150. }
  151. if conf.Net.SASL.Enable {
  152. b.connErr = b.authenticateViaSASL()
  153. if b.connErr != nil {
  154. err = b.conn.Close()
  155. if err == nil {
  156. Logger.Printf("Closed connection to broker %s\n", b.addr)
  157. } else {
  158. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  159. }
  160. b.conn = nil
  161. atomic.StoreInt32(&b.opened, 0)
  162. return
  163. }
  164. }
  165. b.done = make(chan bool)
  166. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  167. if b.id >= 0 {
  168. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  169. } else {
  170. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  171. }
  172. go withRecover(b.responseReceiver)
  173. })
  174. return nil
  175. }
  176. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  177. // connected but it had tried to connect, the error from that connection attempt is also returned.
  178. func (b *Broker) Connected() (bool, error) {
  179. b.lock.Lock()
  180. defer b.lock.Unlock()
  181. return b.conn != nil, b.connErr
  182. }
  183. func (b *Broker) Close() error {
  184. b.lock.Lock()
  185. defer b.lock.Unlock()
  186. if b.conn == nil {
  187. return ErrNotConnected
  188. }
  189. close(b.responses)
  190. <-b.done
  191. err := b.conn.Close()
  192. b.conn = nil
  193. b.connErr = nil
  194. b.done = nil
  195. b.responses = nil
  196. if b.id >= 0 {
  197. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
  198. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
  199. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
  200. b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
  201. }
  202. if err == nil {
  203. Logger.Printf("Closed connection to broker %s\n", b.addr)
  204. } else {
  205. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  206. }
  207. atomic.StoreInt32(&b.opened, 0)
  208. return err
  209. }
  210. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  211. func (b *Broker) ID() int32 {
  212. return b.id
  213. }
  214. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  215. func (b *Broker) Addr() string {
  216. return b.addr
  217. }
  218. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  219. // empty string if it is not known. The returned value corresponds to the
  220. // broker's broker.rack configuration setting. Requires protocol version to be
  221. // at least v0.10.0.0.
  222. func (b *Broker) Rack() string {
  223. if b.rack == nil {
  224. return ""
  225. }
  226. return *b.rack
  227. }
  228. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  229. response := new(MetadataResponse)
  230. err := b.sendAndReceive(request, response)
  231. if err != nil {
  232. return nil, err
  233. }
  234. return response, nil
  235. }
  236. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  237. response := new(ConsumerMetadataResponse)
  238. err := b.sendAndReceive(request, response)
  239. if err != nil {
  240. return nil, err
  241. }
  242. return response, nil
  243. }
  244. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  245. response := new(FindCoordinatorResponse)
  246. err := b.sendAndReceive(request, response)
  247. if err != nil {
  248. return nil, err
  249. }
  250. return response, nil
  251. }
  252. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  253. response := new(OffsetResponse)
  254. err := b.sendAndReceive(request, response)
  255. if err != nil {
  256. return nil, err
  257. }
  258. return response, nil
  259. }
  260. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  261. var response *ProduceResponse
  262. var err error
  263. if request.RequiredAcks == NoResponse {
  264. err = b.sendAndReceive(request, nil)
  265. } else {
  266. response = new(ProduceResponse)
  267. err = b.sendAndReceive(request, response)
  268. }
  269. if err != nil {
  270. return nil, err
  271. }
  272. return response, nil
  273. }
  274. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  275. response := new(FetchResponse)
  276. err := b.sendAndReceive(request, response)
  277. if err != nil {
  278. return nil, err
  279. }
  280. return response, nil
  281. }
  282. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  283. response := new(OffsetCommitResponse)
  284. err := b.sendAndReceive(request, response)
  285. if err != nil {
  286. return nil, err
  287. }
  288. return response, nil
  289. }
  290. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  291. response := new(OffsetFetchResponse)
  292. err := b.sendAndReceive(request, response)
  293. if err != nil {
  294. return nil, err
  295. }
  296. return response, nil
  297. }
  298. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  299. response := new(JoinGroupResponse)
  300. err := b.sendAndReceive(request, response)
  301. if err != nil {
  302. return nil, err
  303. }
  304. return response, nil
  305. }
  306. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  307. response := new(SyncGroupResponse)
  308. err := b.sendAndReceive(request, response)
  309. if err != nil {
  310. return nil, err
  311. }
  312. return response, nil
  313. }
  314. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  315. response := new(LeaveGroupResponse)
  316. err := b.sendAndReceive(request, response)
  317. if err != nil {
  318. return nil, err
  319. }
  320. return response, nil
  321. }
  322. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  323. response := new(HeartbeatResponse)
  324. err := b.sendAndReceive(request, response)
  325. if err != nil {
  326. return nil, err
  327. }
  328. return response, nil
  329. }
  330. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  331. response := new(ListGroupsResponse)
  332. err := b.sendAndReceive(request, response)
  333. if err != nil {
  334. return nil, err
  335. }
  336. return response, nil
  337. }
  338. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  339. response := new(DescribeGroupsResponse)
  340. err := b.sendAndReceive(request, response)
  341. if err != nil {
  342. return nil, err
  343. }
  344. return response, nil
  345. }
  346. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  347. response := new(ApiVersionsResponse)
  348. err := b.sendAndReceive(request, response)
  349. if err != nil {
  350. return nil, err
  351. }
  352. return response, nil
  353. }
  354. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  355. response := new(CreateTopicsResponse)
  356. err := b.sendAndReceive(request, response)
  357. if err != nil {
  358. return nil, err
  359. }
  360. return response, nil
  361. }
  362. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  363. response := new(DeleteTopicsResponse)
  364. err := b.sendAndReceive(request, response)
  365. if err != nil {
  366. return nil, err
  367. }
  368. return response, nil
  369. }
  370. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  371. response := new(CreatePartitionsResponse)
  372. err := b.sendAndReceive(request, response)
  373. if err != nil {
  374. return nil, err
  375. }
  376. return response, nil
  377. }
  378. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  379. response := new(DeleteRecordsResponse)
  380. err := b.sendAndReceive(request, response)
  381. if err != nil {
  382. return nil, err
  383. }
  384. return response, nil
  385. }
  386. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  387. response := new(DescribeAclsResponse)
  388. err := b.sendAndReceive(request, response)
  389. if err != nil {
  390. return nil, err
  391. }
  392. return response, nil
  393. }
  394. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  395. response := new(CreateAclsResponse)
  396. err := b.sendAndReceive(request, response)
  397. if err != nil {
  398. return nil, err
  399. }
  400. return response, nil
  401. }
  402. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  403. response := new(DeleteAclsResponse)
  404. err := b.sendAndReceive(request, response)
  405. if err != nil {
  406. return nil, err
  407. }
  408. return response, nil
  409. }
  410. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  411. response := new(InitProducerIDResponse)
  412. err := b.sendAndReceive(request, response)
  413. if err != nil {
  414. return nil, err
  415. }
  416. return response, nil
  417. }
  418. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  419. response := new(AddPartitionsToTxnResponse)
  420. err := b.sendAndReceive(request, response)
  421. if err != nil {
  422. return nil, err
  423. }
  424. return response, nil
  425. }
  426. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  427. response := new(AddOffsetsToTxnResponse)
  428. err := b.sendAndReceive(request, response)
  429. if err != nil {
  430. return nil, err
  431. }
  432. return response, nil
  433. }
  434. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  435. response := new(EndTxnResponse)
  436. err := b.sendAndReceive(request, response)
  437. if err != nil {
  438. return nil, err
  439. }
  440. return response, nil
  441. }
  442. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  443. response := new(TxnOffsetCommitResponse)
  444. err := b.sendAndReceive(request, response)
  445. if err != nil {
  446. return nil, err
  447. }
  448. return response, nil
  449. }
  450. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  451. response := new(DescribeConfigsResponse)
  452. err := b.sendAndReceive(request, response)
  453. if err != nil {
  454. return nil, err
  455. }
  456. return response, nil
  457. }
  458. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  459. response := new(AlterConfigsResponse)
  460. err := b.sendAndReceive(request, response)
  461. if err != nil {
  462. return nil, err
  463. }
  464. return response, nil
  465. }
  466. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  467. response := new(DeleteGroupsResponse)
  468. if err := b.sendAndReceive(request, response); err != nil {
  469. return nil, err
  470. }
  471. return response, nil
  472. }
  473. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  474. b.lock.Lock()
  475. defer b.lock.Unlock()
  476. if b.conn == nil {
  477. if b.connErr != nil {
  478. return nil, b.connErr
  479. }
  480. return nil, ErrNotConnected
  481. }
  482. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  483. return nil, ErrUnsupportedVersion
  484. }
  485. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  486. buf, err := encode(req, b.conf.MetricRegistry)
  487. if err != nil {
  488. return nil, err
  489. }
  490. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  491. if err != nil {
  492. return nil, err
  493. }
  494. requestTime := time.Now()
  495. bytes, err := b.conn.Write(buf)
  496. b.updateOutgoingCommunicationMetrics(bytes)
  497. if err != nil {
  498. return nil, err
  499. }
  500. b.correlationID++
  501. if !promiseResponse {
  502. // Record request latency without the response
  503. b.updateRequestLatencyMetrics(time.Since(requestTime))
  504. return nil, nil
  505. }
  506. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  507. b.responses <- promise
  508. return &promise, nil
  509. }
  510. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  511. promise, err := b.send(req, res != nil)
  512. if err != nil {
  513. return err
  514. }
  515. if promise == nil {
  516. return nil
  517. }
  518. select {
  519. case buf := <-promise.packets:
  520. return versionedDecode(buf, res, req.version())
  521. case err = <-promise.errors:
  522. return err
  523. }
  524. }
  525. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  526. b.id, err = pd.getInt32()
  527. if err != nil {
  528. return err
  529. }
  530. host, err := pd.getString()
  531. if err != nil {
  532. return err
  533. }
  534. port, err := pd.getInt32()
  535. if err != nil {
  536. return err
  537. }
  538. if version >= 1 {
  539. b.rack, err = pd.getNullableString()
  540. if err != nil {
  541. return err
  542. }
  543. }
  544. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  545. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  546. return err
  547. }
  548. return nil
  549. }
  550. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  551. host, portstr, err := net.SplitHostPort(b.addr)
  552. if err != nil {
  553. return err
  554. }
  555. port, err := strconv.Atoi(portstr)
  556. if err != nil {
  557. return err
  558. }
  559. pe.putInt32(b.id)
  560. err = pe.putString(host)
  561. if err != nil {
  562. return err
  563. }
  564. pe.putInt32(int32(port))
  565. if version >= 1 {
  566. err = pe.putNullableString(b.rack)
  567. if err != nil {
  568. return err
  569. }
  570. }
  571. return nil
  572. }
  573. func (b *Broker) responseReceiver() {
  574. var dead error
  575. header := make([]byte, 8)
  576. for response := range b.responses {
  577. if dead != nil {
  578. response.errors <- dead
  579. continue
  580. }
  581. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  582. if err != nil {
  583. dead = err
  584. response.errors <- err
  585. continue
  586. }
  587. bytesReadHeader, err := io.ReadFull(b.conn, header)
  588. requestLatency := time.Since(response.requestTime)
  589. if err != nil {
  590. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  591. dead = err
  592. response.errors <- err
  593. continue
  594. }
  595. decodedHeader := responseHeader{}
  596. err = decode(header, &decodedHeader)
  597. if err != nil {
  598. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  599. dead = err
  600. response.errors <- err
  601. continue
  602. }
  603. if decodedHeader.correlationID != response.correlationID {
  604. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  605. // TODO if decoded ID < cur ID, discard until we catch up
  606. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  607. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  608. response.errors <- dead
  609. continue
  610. }
  611. buf := make([]byte, decodedHeader.length-4)
  612. bytesReadBody, err := io.ReadFull(b.conn, buf)
  613. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  614. if err != nil {
  615. dead = err
  616. response.errors <- err
  617. continue
  618. }
  619. response.packets <- buf
  620. }
  621. close(b.done)
  622. }
  623. func (b *Broker) authenticateViaSASL() error {
  624. if b.conf.Net.SASL.Mechanism == SASLTypeOAuth {
  625. return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
  626. }
  627. return b.sendAndReceiveSASLPlainAuth()
  628. }
  629. func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) error {
  630. rb := &SaslHandshakeRequest{Mechanism: saslType, Version: version}
  631. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  632. buf, err := encode(req, b.conf.MetricRegistry)
  633. if err != nil {
  634. return err
  635. }
  636. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  637. if err != nil {
  638. return err
  639. }
  640. requestTime := time.Now()
  641. bytes, err := b.conn.Write(buf)
  642. b.updateOutgoingCommunicationMetrics(bytes)
  643. if err != nil {
  644. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  645. return err
  646. }
  647. b.correlationID++
  648. //wait for the response
  649. header := make([]byte, 8) // response header
  650. _, err = io.ReadFull(b.conn, header)
  651. if err != nil {
  652. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  653. return err
  654. }
  655. length := binary.BigEndian.Uint32(header[:4])
  656. payload := make([]byte, length-4)
  657. n, err := io.ReadFull(b.conn, payload)
  658. if err != nil {
  659. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  660. return err
  661. }
  662. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  663. res := &SaslHandshakeResponse{}
  664. err = versionedDecode(payload, res, 0)
  665. if err != nil {
  666. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  667. return err
  668. }
  669. if res.Err != ErrNoError {
  670. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  671. return res.Err
  672. }
  673. Logger.Print("Successful SASL handshake")
  674. return nil
  675. }
  676. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  677. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  678. //
  679. // In SASL Plain, Kafka expects the auth header to be in the following format
  680. // Message format (from https://tools.ietf.org/html/rfc4616):
  681. //
  682. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  683. // authcid = 1*SAFE ; MUST accept up to 255 octets
  684. // authzid = 1*SAFE ; MUST accept up to 255 octets
  685. // passwd = 1*SAFE ; MUST accept up to 255 octets
  686. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  687. //
  688. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  689. // ;; any UTF-8 encoded Unicode character except NUL
  690. //
  691. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  692. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  693. // of responding to bad credentials but thats how its being done today.
  694. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  695. if b.conf.Net.SASL.Handshake {
  696. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, SASLHandshakeV0)
  697. if handshakeErr != nil {
  698. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  699. return handshakeErr
  700. }
  701. }
  702. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  703. authBytes := make([]byte, length+4) //4 byte length header + auth data
  704. binary.BigEndian.PutUint32(authBytes, uint32(length))
  705. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  706. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  707. if err != nil {
  708. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  709. return err
  710. }
  711. requestTime := time.Now()
  712. bytesWritten, err := b.conn.Write(authBytes)
  713. b.updateOutgoingCommunicationMetrics(bytesWritten)
  714. if err != nil {
  715. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  716. return err
  717. }
  718. header := make([]byte, 4)
  719. n, err := io.ReadFull(b.conn, header)
  720. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  721. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  722. // Otherwise, the broker closes the connection and we get an EOF
  723. if err != nil {
  724. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  725. return err
  726. }
  727. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  728. return nil
  729. }
  730. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  731. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  732. func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
  733. if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
  734. return err
  735. }
  736. token, err := provider.Token()
  737. if err != nil {
  738. return err
  739. }
  740. requestTime := time.Now()
  741. correlationID := b.correlationID
  742. bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
  743. if err != nil {
  744. return err
  745. }
  746. b.updateOutgoingCommunicationMetrics(bytesWritten)
  747. b.correlationID++
  748. bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
  749. if err != nil {
  750. return err
  751. }
  752. requestLatency := time.Since(requestTime)
  753. b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  754. return nil
  755. }
  756. // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
  757. // https://tools.ietf.org/html/rfc7628
  758. func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
  759. var ext string
  760. if token.Extensions != nil && len(token.Extensions) > 0 {
  761. if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
  762. return []byte{}, fmt.Errorf("The extension `%s` is invalid", SASLExtKeyAuth)
  763. }
  764. ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
  765. }
  766. resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
  767. return resp, nil
  768. }
  769. // mapToString returns a list of key-value pairs ordered by key.
  770. // keyValSep separates the key from the value. elemSep separates each pair.
  771. func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
  772. buf := make([]string, 0, len(extensions))
  773. for k, v := range extensions {
  774. buf = append(buf, k+keyValSep+v)
  775. }
  776. sort.Strings(buf)
  777. return strings.Join(buf, elemSep)
  778. }
  779. func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
  780. initialResp, err := buildClientInitialResponse(token)
  781. if err != nil {
  782. return 0, err
  783. }
  784. rb := &SaslAuthenticateRequest{initialResp}
  785. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  786. buf, err := encode(req, b.conf.MetricRegistry)
  787. if err != nil {
  788. return 0, err
  789. }
  790. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  791. return 0, err
  792. }
  793. return b.conn.Write(buf)
  794. }
  795. func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
  796. buf := make([]byte, 8)
  797. bytesRead, err := io.ReadFull(b.conn, buf)
  798. if err != nil {
  799. return bytesRead, err
  800. }
  801. header := responseHeader{}
  802. err = decode(buf, &header)
  803. if err != nil {
  804. return bytesRead, err
  805. }
  806. if header.correlationID != correlationID {
  807. return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  808. }
  809. buf = make([]byte, header.length-4)
  810. c, err := io.ReadFull(b.conn, buf)
  811. bytesRead += c
  812. if err != nil {
  813. return bytesRead, err
  814. }
  815. res := &SaslAuthenticateResponse{}
  816. if err := versionedDecode(buf, res, 0); err != nil {
  817. return bytesRead, err
  818. }
  819. if err != nil {
  820. return bytesRead, err
  821. }
  822. if res.Err != ErrNoError {
  823. return bytesRead, res.Err
  824. }
  825. if len(res.SaslAuthBytes) > 0 {
  826. Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
  827. }
  828. return bytesRead, nil
  829. }
  830. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  831. b.updateRequestLatencyMetrics(requestLatency)
  832. b.responseRate.Mark(1)
  833. if b.brokerResponseRate != nil {
  834. b.brokerResponseRate.Mark(1)
  835. }
  836. responseSize := int64(bytes)
  837. b.incomingByteRate.Mark(responseSize)
  838. if b.brokerIncomingByteRate != nil {
  839. b.brokerIncomingByteRate.Mark(responseSize)
  840. }
  841. b.responseSize.Update(responseSize)
  842. if b.brokerResponseSize != nil {
  843. b.brokerResponseSize.Update(responseSize)
  844. }
  845. }
  846. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  847. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  848. b.requestLatency.Update(requestLatencyInMs)
  849. if b.brokerRequestLatency != nil {
  850. b.brokerRequestLatency.Update(requestLatencyInMs)
  851. }
  852. }
  853. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  854. b.requestRate.Mark(1)
  855. if b.brokerRequestRate != nil {
  856. b.brokerRequestRate.Mark(1)
  857. }
  858. requestSize := int64(bytes)
  859. b.outgoingByteRate.Mark(requestSize)
  860. if b.brokerOutgoingByteRate != nil {
  861. b.brokerOutgoingByteRate.Mark(requestSize)
  862. }
  863. b.requestSize.Update(requestSize)
  864. if b.brokerRequestSize != nil {
  865. b.brokerRequestSize.Update(requestSize)
  866. }
  867. }