broker.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. metrics "github.com/rcrowley/go-metrics"
  15. )
  16. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  17. type Broker struct {
  18. conf *Config
  19. rack *string
  20. id int32
  21. addr string
  22. correlationID int32
  23. conn net.Conn
  24. connErr error
  25. lock sync.Mutex
  26. opened int32
  27. responses chan responsePromise
  28. done chan bool
  29. registeredMetrics []string
  30. incomingByteRate metrics.Meter
  31. requestRate metrics.Meter
  32. requestSize metrics.Histogram
  33. requestLatency metrics.Histogram
  34. outgoingByteRate metrics.Meter
  35. responseRate metrics.Meter
  36. responseSize metrics.Histogram
  37. requestsInFlight metrics.Counter
  38. brokerIncomingByteRate metrics.Meter
  39. brokerRequestRate metrics.Meter
  40. brokerRequestSize metrics.Histogram
  41. brokerRequestLatency metrics.Histogram
  42. brokerOutgoingByteRate metrics.Meter
  43. brokerResponseRate metrics.Meter
  44. brokerResponseSize metrics.Histogram
  45. brokerRequestsInFlight metrics.Counter
  46. kerberosAuthenticator GSSAPIKerberosAuth
  47. }
  48. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  49. type SASLMechanism string
  50. const (
  51. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  52. SASLTypeOAuth = "OAUTHBEARER"
  53. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  54. SASLTypePlaintext = "PLAIN"
  55. // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
  56. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
  57. // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
  58. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
  59. SASLTypeGSSAPI = "GSSAPI"
  60. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  61. // server negotiate SASL auth using opaque packets.
  62. SASLHandshakeV0 = int16(0)
  63. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  64. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  65. SASLHandshakeV1 = int16(1)
  66. // SASLExtKeyAuth is the reserved extension key name sent as part of the
  67. // SASL/OAUTHBEARER intial client response
  68. SASLExtKeyAuth = "auth"
  69. )
  70. // AccessToken contains an access token used to authenticate a
  71. // SASL/OAUTHBEARER client along with associated metadata.
  72. type AccessToken struct {
  73. // Token is the access token payload.
  74. Token string
  75. // Extensions is a optional map of arbitrary key-value pairs that can be
  76. // sent with the SASL/OAUTHBEARER initial client response. These values are
  77. // ignored by the SASL server if they are unexpected. This feature is only
  78. // supported by Kafka >= 2.1.0.
  79. Extensions map[string]string
  80. }
  81. // AccessTokenProvider is the interface that encapsulates how implementors
  82. // can generate access tokens for Kafka broker authentication.
  83. type AccessTokenProvider interface {
  84. // Token returns an access token. The implementation should ensure token
  85. // reuse so that multiple calls at connect time do not create multiple
  86. // tokens. The implementation should also periodically refresh the token in
  87. // order to guarantee that each call returns an unexpired token. This
  88. // method should not block indefinitely--a timeout error should be returned
  89. // after a short period of inactivity so that the broker connection logic
  90. // can log debugging information and retry.
  91. Token() (*AccessToken, error)
  92. }
  93. // SCRAMClient is a an interface to a SCRAM
  94. // client implementation.
  95. type SCRAMClient interface {
  96. // Begin prepares the client for the SCRAM exchange
  97. // with the server with a user name and a password
  98. Begin(userName, password, authzID string) error
  99. // Step steps client through the SCRAM exchange. It is
  100. // called repeatedly until it errors or `Done` returns true.
  101. Step(challenge string) (response string, err error)
  102. // Done should return true when the SCRAM conversation
  103. // is over.
  104. Done() bool
  105. }
  106. type responsePromise struct {
  107. requestTime time.Time
  108. correlationID int32
  109. packets chan []byte
  110. errors chan error
  111. }
  112. // NewBroker creates and returns a Broker targeting the given host:port address.
  113. // This does not attempt to actually connect, you have to call Open() for that.
  114. func NewBroker(addr string) *Broker {
  115. return &Broker{id: -1, addr: addr}
  116. }
  117. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  118. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  119. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  120. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  121. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  122. func (b *Broker) Open(conf *Config) error {
  123. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  124. return ErrAlreadyConnected
  125. }
  126. if conf == nil {
  127. conf = NewConfig()
  128. }
  129. err := conf.Validate()
  130. if err != nil {
  131. return err
  132. }
  133. b.lock.Lock()
  134. go withRecover(func() {
  135. defer b.lock.Unlock()
  136. dialer := net.Dialer{
  137. Timeout: conf.Net.DialTimeout,
  138. KeepAlive: conf.Net.KeepAlive,
  139. LocalAddr: conf.Net.LocalAddr,
  140. }
  141. if conf.Net.TLS.Enable {
  142. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  143. } else if conf.Net.Proxy.Enable {
  144. b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
  145. } else {
  146. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  147. }
  148. if b.connErr != nil {
  149. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  150. b.conn = nil
  151. atomic.StoreInt32(&b.opened, 0)
  152. return
  153. }
  154. b.conn = newBufConn(b.conn)
  155. b.conf = conf
  156. // Create or reuse the global metrics shared between brokers
  157. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  158. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  159. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  160. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  161. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  162. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  163. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  164. b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
  165. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  166. // the same id (-1) and are already exposed through the global metrics above
  167. if b.id >= 0 {
  168. b.registerMetrics()
  169. }
  170. if conf.Net.SASL.Enable {
  171. b.connErr = b.authenticateViaSASL()
  172. if b.connErr != nil {
  173. err = b.conn.Close()
  174. if err == nil {
  175. Logger.Printf("Closed connection to broker %s\n", b.addr)
  176. } else {
  177. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  178. }
  179. b.conn = nil
  180. atomic.StoreInt32(&b.opened, 0)
  181. return
  182. }
  183. }
  184. b.done = make(chan bool)
  185. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  186. if b.id >= 0 {
  187. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  188. } else {
  189. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  190. }
  191. go withRecover(b.responseReceiver)
  192. })
  193. return nil
  194. }
  195. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  196. // connected but it had tried to connect, the error from that connection attempt is also returned.
  197. func (b *Broker) Connected() (bool, error) {
  198. b.lock.Lock()
  199. defer b.lock.Unlock()
  200. return b.conn != nil, b.connErr
  201. }
  202. //Close closes the broker resources
  203. func (b *Broker) Close() error {
  204. b.lock.Lock()
  205. defer b.lock.Unlock()
  206. if b.conn == nil {
  207. return ErrNotConnected
  208. }
  209. close(b.responses)
  210. <-b.done
  211. err := b.conn.Close()
  212. b.conn = nil
  213. b.connErr = nil
  214. b.done = nil
  215. b.responses = nil
  216. b.unregisterMetrics()
  217. if err == nil {
  218. Logger.Printf("Closed connection to broker %s\n", b.addr)
  219. } else {
  220. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  221. }
  222. atomic.StoreInt32(&b.opened, 0)
  223. return err
  224. }
  225. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  226. func (b *Broker) ID() int32 {
  227. return b.id
  228. }
  229. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  230. func (b *Broker) Addr() string {
  231. return b.addr
  232. }
  233. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  234. // empty string if it is not known. The returned value corresponds to the
  235. // broker's broker.rack configuration setting. Requires protocol version to be
  236. // at least v0.10.0.0.
  237. func (b *Broker) Rack() string {
  238. if b.rack == nil {
  239. return ""
  240. }
  241. return *b.rack
  242. }
  243. //GetMetadata send a metadata request and returns a metadata response or error
  244. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  245. response := new(MetadataResponse)
  246. err := b.sendAndReceive(request, response)
  247. if err != nil {
  248. return nil, err
  249. }
  250. return response, nil
  251. }
  252. //GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
  253. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  254. response := new(ConsumerMetadataResponse)
  255. err := b.sendAndReceive(request, response)
  256. if err != nil {
  257. return nil, err
  258. }
  259. return response, nil
  260. }
  261. //FindCoordinator sends a find coordinate request and returns a response or error
  262. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  263. response := new(FindCoordinatorResponse)
  264. err := b.sendAndReceive(request, response)
  265. if err != nil {
  266. return nil, err
  267. }
  268. return response, nil
  269. }
  270. //GetAvailableOffsets return an offset response or error
  271. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  272. response := new(OffsetResponse)
  273. err := b.sendAndReceive(request, response)
  274. if err != nil {
  275. return nil, err
  276. }
  277. return response, nil
  278. }
  279. //Produce returns a produce response or error
  280. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  281. var (
  282. response *ProduceResponse
  283. err error
  284. )
  285. if request.RequiredAcks == NoResponse {
  286. err = b.sendAndReceive(request, nil)
  287. } else {
  288. response = new(ProduceResponse)
  289. err = b.sendAndReceive(request, response)
  290. }
  291. if err != nil {
  292. return nil, err
  293. }
  294. return response, nil
  295. }
  296. //Fetch returns a FetchResponse or error
  297. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  298. response := new(FetchResponse)
  299. err := b.sendAndReceive(request, response)
  300. if err != nil {
  301. return nil, err
  302. }
  303. return response, nil
  304. }
  305. //CommitOffset return an Offset commit reponse or error
  306. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  307. response := new(OffsetCommitResponse)
  308. err := b.sendAndReceive(request, response)
  309. if err != nil {
  310. return nil, err
  311. }
  312. return response, nil
  313. }
  314. //FetchOffset returns an offset fetch response or error
  315. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  316. response := new(OffsetFetchResponse)
  317. err := b.sendAndReceive(request, response)
  318. if err != nil {
  319. return nil, err
  320. }
  321. return response, nil
  322. }
  323. //JoinGroup returns a join group response or error
  324. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  325. response := new(JoinGroupResponse)
  326. err := b.sendAndReceive(request, response)
  327. if err != nil {
  328. return nil, err
  329. }
  330. return response, nil
  331. }
  332. //SyncGroup returns a sync group response or error
  333. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  334. response := new(SyncGroupResponse)
  335. err := b.sendAndReceive(request, response)
  336. if err != nil {
  337. return nil, err
  338. }
  339. return response, nil
  340. }
  341. //LeaveGroup return a leave group response or error
  342. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  343. response := new(LeaveGroupResponse)
  344. err := b.sendAndReceive(request, response)
  345. if err != nil {
  346. return nil, err
  347. }
  348. return response, nil
  349. }
  350. //Heartbeat returns a heartbeat response or error
  351. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  352. response := new(HeartbeatResponse)
  353. err := b.sendAndReceive(request, response)
  354. if err != nil {
  355. return nil, err
  356. }
  357. return response, nil
  358. }
  359. //ListGroups return a list group response or error
  360. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  361. response := new(ListGroupsResponse)
  362. err := b.sendAndReceive(request, response)
  363. if err != nil {
  364. return nil, err
  365. }
  366. return response, nil
  367. }
  368. //DescribeGroups return describe group response or error
  369. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  370. response := new(DescribeGroupsResponse)
  371. err := b.sendAndReceive(request, response)
  372. if err != nil {
  373. return nil, err
  374. }
  375. return response, nil
  376. }
  377. //ApiVersions return api version response or error
  378. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  379. response := new(ApiVersionsResponse)
  380. err := b.sendAndReceive(request, response)
  381. if err != nil {
  382. return nil, err
  383. }
  384. return response, nil
  385. }
  386. //CreateTopics send a create topic request and returns create topic response
  387. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  388. response := new(CreateTopicsResponse)
  389. err := b.sendAndReceive(request, response)
  390. if err != nil {
  391. return nil, err
  392. }
  393. return response, nil
  394. }
  395. //DeleteTopics sends a delete topic request and returns delete topic response
  396. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  397. response := new(DeleteTopicsResponse)
  398. err := b.sendAndReceive(request, response)
  399. if err != nil {
  400. return nil, err
  401. }
  402. return response, nil
  403. }
  404. //CreatePartitions sends a create partition request and returns create
  405. //partitions response or error
  406. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  407. response := new(CreatePartitionsResponse)
  408. err := b.sendAndReceive(request, response)
  409. if err != nil {
  410. return nil, err
  411. }
  412. return response, nil
  413. }
  414. //DeleteRecords send a request to delete records and return delete record
  415. //response or error
  416. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  417. response := new(DeleteRecordsResponse)
  418. err := b.sendAndReceive(request, response)
  419. if err != nil {
  420. return nil, err
  421. }
  422. return response, nil
  423. }
  424. //DescribeAcls sends a describe acl request and returns a response or error
  425. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  426. response := new(DescribeAclsResponse)
  427. err := b.sendAndReceive(request, response)
  428. if err != nil {
  429. return nil, err
  430. }
  431. return response, nil
  432. }
  433. //CreateAcls sends a create acl request and returns a response or error
  434. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  435. response := new(CreateAclsResponse)
  436. err := b.sendAndReceive(request, response)
  437. if err != nil {
  438. return nil, err
  439. }
  440. return response, nil
  441. }
  442. //DeleteAcls sends a delete acl request and returns a response or error
  443. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  444. response := new(DeleteAclsResponse)
  445. err := b.sendAndReceive(request, response)
  446. if err != nil {
  447. return nil, err
  448. }
  449. return response, nil
  450. }
  451. //InitProducerID sends an init producer request and returns a response or error
  452. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  453. response := new(InitProducerIDResponse)
  454. err := b.sendAndReceive(request, response)
  455. if err != nil {
  456. return nil, err
  457. }
  458. return response, nil
  459. }
  460. //AddPartitionsToTxn send a request to add partition to txn and returns
  461. //a response or error
  462. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  463. response := new(AddPartitionsToTxnResponse)
  464. err := b.sendAndReceive(request, response)
  465. if err != nil {
  466. return nil, err
  467. }
  468. return response, nil
  469. }
  470. //AddOffsetsToTxn sends a request to add offsets to txn and returns a response
  471. //or error
  472. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  473. response := new(AddOffsetsToTxnResponse)
  474. err := b.sendAndReceive(request, response)
  475. if err != nil {
  476. return nil, err
  477. }
  478. return response, nil
  479. }
  480. //EndTxn sends a request to end txn and returns a response or error
  481. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  482. response := new(EndTxnResponse)
  483. err := b.sendAndReceive(request, response)
  484. if err != nil {
  485. return nil, err
  486. }
  487. return response, nil
  488. }
  489. //TxnOffsetCommit sends a request to commit transaction offsets and returns
  490. //a response or error
  491. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  492. response := new(TxnOffsetCommitResponse)
  493. err := b.sendAndReceive(request, response)
  494. if err != nil {
  495. return nil, err
  496. }
  497. return response, nil
  498. }
  499. //DescribeConfigs sends a request to describe config and returns a response or
  500. //error
  501. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  502. response := new(DescribeConfigsResponse)
  503. err := b.sendAndReceive(request, response)
  504. if err != nil {
  505. return nil, err
  506. }
  507. return response, nil
  508. }
  509. //AlterConfigs sends a request to alter config and return a response or error
  510. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  511. response := new(AlterConfigsResponse)
  512. err := b.sendAndReceive(request, response)
  513. if err != nil {
  514. return nil, err
  515. }
  516. return response, nil
  517. }
  518. //DeleteGroups sends a request to delete groups and returns a response or error
  519. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  520. response := new(DeleteGroupsResponse)
  521. if err := b.sendAndReceive(request, response); err != nil {
  522. return nil, err
  523. }
  524. return response, nil
  525. }
  526. //DescribeLogDirs sends a request to get the broker's log dir paths and sizes
  527. func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
  528. response := new(DescribeLogDirsResponse)
  529. err := b.sendAndReceive(request, response)
  530. if err != nil {
  531. return nil, err
  532. }
  533. return response, nil
  534. }
  535. // readFull ensures the conn ReadDeadline has been setup before making a
  536. // call to io.ReadFull
  537. func (b *Broker) readFull(buf []byte) (n int, err error) {
  538. if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
  539. return 0, err
  540. }
  541. return io.ReadFull(b.conn, buf)
  542. }
  543. // write ensures the conn WriteDeadline has been setup before making a
  544. // call to conn.Write
  545. func (b *Broker) write(buf []byte) (n int, err error) {
  546. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  547. return 0, err
  548. }
  549. return b.conn.Write(buf)
  550. }
  551. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  552. b.lock.Lock()
  553. defer b.lock.Unlock()
  554. if b.conn == nil {
  555. if b.connErr != nil {
  556. return nil, b.connErr
  557. }
  558. return nil, ErrNotConnected
  559. }
  560. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  561. return nil, ErrUnsupportedVersion
  562. }
  563. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  564. buf, err := encode(req, b.conf.MetricRegistry)
  565. if err != nil {
  566. return nil, err
  567. }
  568. requestTime := time.Now()
  569. // Will be decremented in responseReceiver (except error or request with NoResponse)
  570. b.addRequestInFlightMetrics(1)
  571. bytes, err := b.write(buf)
  572. b.updateOutgoingCommunicationMetrics(bytes)
  573. if err != nil {
  574. b.addRequestInFlightMetrics(-1)
  575. return nil, err
  576. }
  577. b.correlationID++
  578. if !promiseResponse {
  579. // Record request latency without the response
  580. b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
  581. return nil, nil
  582. }
  583. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  584. b.responses <- promise
  585. return &promise, nil
  586. }
  587. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  588. promise, err := b.send(req, res != nil)
  589. if err != nil {
  590. return err
  591. }
  592. if promise == nil {
  593. return nil
  594. }
  595. select {
  596. case buf := <-promise.packets:
  597. return versionedDecode(buf, res, req.version())
  598. case err = <-promise.errors:
  599. return err
  600. }
  601. }
  602. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  603. b.id, err = pd.getInt32()
  604. if err != nil {
  605. return err
  606. }
  607. host, err := pd.getString()
  608. if err != nil {
  609. return err
  610. }
  611. port, err := pd.getInt32()
  612. if err != nil {
  613. return err
  614. }
  615. if version >= 1 {
  616. b.rack, err = pd.getNullableString()
  617. if err != nil {
  618. return err
  619. }
  620. }
  621. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  622. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  623. return err
  624. }
  625. return nil
  626. }
  627. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  628. host, portstr, err := net.SplitHostPort(b.addr)
  629. if err != nil {
  630. return err
  631. }
  632. port, err := strconv.Atoi(portstr)
  633. if err != nil {
  634. return err
  635. }
  636. pe.putInt32(b.id)
  637. err = pe.putString(host)
  638. if err != nil {
  639. return err
  640. }
  641. pe.putInt32(int32(port))
  642. if version >= 1 {
  643. err = pe.putNullableString(b.rack)
  644. if err != nil {
  645. return err
  646. }
  647. }
  648. return nil
  649. }
  650. func (b *Broker) responseReceiver() {
  651. var dead error
  652. header := make([]byte, 8)
  653. for response := range b.responses {
  654. if dead != nil {
  655. // This was previously incremented in send() and
  656. // we are not calling updateIncomingCommunicationMetrics()
  657. b.addRequestInFlightMetrics(-1)
  658. response.errors <- dead
  659. continue
  660. }
  661. bytesReadHeader, err := b.readFull(header)
  662. requestLatency := time.Since(response.requestTime)
  663. if err != nil {
  664. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  665. dead = err
  666. response.errors <- err
  667. continue
  668. }
  669. decodedHeader := responseHeader{}
  670. err = decode(header, &decodedHeader)
  671. if err != nil {
  672. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  673. dead = err
  674. response.errors <- err
  675. continue
  676. }
  677. if decodedHeader.correlationID != response.correlationID {
  678. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  679. // TODO if decoded ID < cur ID, discard until we catch up
  680. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  681. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  682. response.errors <- dead
  683. continue
  684. }
  685. buf := make([]byte, decodedHeader.length-4)
  686. bytesReadBody, err := b.readFull(buf)
  687. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  688. if err != nil {
  689. dead = err
  690. response.errors <- err
  691. continue
  692. }
  693. response.packets <- buf
  694. }
  695. close(b.done)
  696. }
  697. func (b *Broker) authenticateViaSASL() error {
  698. switch b.conf.Net.SASL.Mechanism {
  699. case SASLTypeOAuth:
  700. return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
  701. case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
  702. return b.sendAndReceiveSASLSCRAMv1()
  703. case SASLTypeGSSAPI:
  704. return b.sendAndReceiveKerberos()
  705. default:
  706. return b.sendAndReceiveSASLPlainAuth()
  707. }
  708. }
  709. func (b *Broker) sendAndReceiveKerberos() error {
  710. b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
  711. if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
  712. b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
  713. }
  714. return b.kerberosAuthenticator.Authorize(b)
  715. }
  716. func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
  717. rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
  718. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  719. buf, err := encode(req, b.conf.MetricRegistry)
  720. if err != nil {
  721. return err
  722. }
  723. requestTime := time.Now()
  724. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  725. b.addRequestInFlightMetrics(1)
  726. bytes, err := b.write(buf)
  727. b.updateOutgoingCommunicationMetrics(bytes)
  728. if err != nil {
  729. b.addRequestInFlightMetrics(-1)
  730. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  731. return err
  732. }
  733. b.correlationID++
  734. header := make([]byte, 8) // response header
  735. _, err = b.readFull(header)
  736. if err != nil {
  737. b.addRequestInFlightMetrics(-1)
  738. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  739. return err
  740. }
  741. length := binary.BigEndian.Uint32(header[:4])
  742. payload := make([]byte, length-4)
  743. n, err := b.readFull(payload)
  744. if err != nil {
  745. b.addRequestInFlightMetrics(-1)
  746. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  747. return err
  748. }
  749. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  750. res := &SaslHandshakeResponse{}
  751. err = versionedDecode(payload, res, 0)
  752. if err != nil {
  753. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  754. return err
  755. }
  756. if res.Err != ErrNoError {
  757. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  758. return res.Err
  759. }
  760. Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
  761. return nil
  762. }
  763. // Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
  764. // Kafka 1.x.x onward added a SaslAuthenticate request/response message which
  765. // wraps the SASL flow in the Kafka protocol, which allows for returning
  766. // meaningful errors on authentication failure.
  767. //
  768. // In SASL Plain, Kafka expects the auth header to be in the following format
  769. // Message format (from https://tools.ietf.org/html/rfc4616):
  770. //
  771. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  772. // authcid = 1*SAFE ; MUST accept up to 255 octets
  773. // authzid = 1*SAFE ; MUST accept up to 255 octets
  774. // passwd = 1*SAFE ; MUST accept up to 255 octets
  775. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  776. //
  777. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  778. // ;; any UTF-8 encoded Unicode character except NUL
  779. //
  780. // With SASL v0 handshake and auth then:
  781. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  782. // When credentials are invalid, Kafka closes the connection.
  783. //
  784. // With SASL v1 handshake and auth then:
  785. // When credentials are invalid, Kafka replies with a SaslAuthenticate response
  786. // containing an error code and message detailing the authentication failure.
  787. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  788. // default to V0 to allow for backward compatability when SASL is enabled
  789. // but not the handshake
  790. if b.conf.Net.SASL.Handshake {
  791. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
  792. if handshakeErr != nil {
  793. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  794. return handshakeErr
  795. }
  796. }
  797. if b.conf.Net.SASL.Version == SASLHandshakeV1 {
  798. return b.sendAndReceiveV1SASLPlainAuth()
  799. }
  800. return b.sendAndReceiveV0SASLPlainAuth()
  801. }
  802. // sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
  803. func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
  804. length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  805. authBytes := make([]byte, length+4) //4 byte length header + auth data
  806. binary.BigEndian.PutUint32(authBytes, uint32(length))
  807. copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  808. requestTime := time.Now()
  809. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  810. b.addRequestInFlightMetrics(1)
  811. bytesWritten, err := b.write(authBytes)
  812. b.updateOutgoingCommunicationMetrics(bytesWritten)
  813. if err != nil {
  814. b.addRequestInFlightMetrics(-1)
  815. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  816. return err
  817. }
  818. header := make([]byte, 4)
  819. n, err := b.readFull(header)
  820. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  821. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  822. // Otherwise, the broker closes the connection and we get an EOF
  823. if err != nil {
  824. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  825. return err
  826. }
  827. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  828. return nil
  829. }
  830. // sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
  831. func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
  832. correlationID := b.correlationID
  833. requestTime := time.Now()
  834. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  835. b.addRequestInFlightMetrics(1)
  836. bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
  837. b.updateOutgoingCommunicationMetrics(bytesWritten)
  838. if err != nil {
  839. b.addRequestInFlightMetrics(-1)
  840. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  841. return err
  842. }
  843. b.correlationID++
  844. bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
  845. b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
  846. // With v1 sasl we get an error message set in the response we can return
  847. if err != nil {
  848. Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
  849. return err
  850. }
  851. return nil
  852. }
  853. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  854. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  855. func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
  856. if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
  857. return err
  858. }
  859. token, err := provider.Token()
  860. if err != nil {
  861. return err
  862. }
  863. message, err := buildClientFirstMessage(token)
  864. if err != nil {
  865. return err
  866. }
  867. challenged, err := b.sendClientMessage(message)
  868. if err != nil {
  869. return err
  870. }
  871. if challenged {
  872. // Abort the token exchange. The broker returns the failure code.
  873. _, err = b.sendClientMessage([]byte(`\x01`))
  874. }
  875. return err
  876. }
  877. // sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
  878. // if the broker responds with a challenge, in which case the token is
  879. // rejected.
  880. func (b *Broker) sendClientMessage(message []byte) (bool, error) {
  881. requestTime := time.Now()
  882. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  883. b.addRequestInFlightMetrics(1)
  884. correlationID := b.correlationID
  885. bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
  886. b.updateOutgoingCommunicationMetrics(bytesWritten)
  887. if err != nil {
  888. b.addRequestInFlightMetrics(-1)
  889. return false, err
  890. }
  891. b.correlationID++
  892. res := &SaslAuthenticateResponse{}
  893. bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
  894. requestLatency := time.Since(requestTime)
  895. b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  896. isChallenge := len(res.SaslAuthBytes) > 0
  897. if isChallenge && err != nil {
  898. Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
  899. }
  900. return isChallenge, err
  901. }
  902. func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
  903. if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
  904. return err
  905. }
  906. scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
  907. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  908. return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
  909. }
  910. msg, err := scramClient.Step("")
  911. if err != nil {
  912. return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
  913. }
  914. for !scramClient.Done() {
  915. requestTime := time.Now()
  916. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  917. b.addRequestInFlightMetrics(1)
  918. correlationID := b.correlationID
  919. bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
  920. b.updateOutgoingCommunicationMetrics(bytesWritten)
  921. if err != nil {
  922. b.addRequestInFlightMetrics(-1)
  923. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  924. return err
  925. }
  926. b.correlationID++
  927. challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
  928. if err != nil {
  929. b.addRequestInFlightMetrics(-1)
  930. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  931. return err
  932. }
  933. b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
  934. msg, err = scramClient.Step(string(challenge))
  935. if err != nil {
  936. Logger.Println("SASL authentication failed", err)
  937. return err
  938. }
  939. }
  940. Logger.Println("SASL authentication succeeded")
  941. return nil
  942. }
  943. func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
  944. rb := &SaslAuthenticateRequest{msg}
  945. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  946. buf, err := encode(req, b.conf.MetricRegistry)
  947. if err != nil {
  948. return 0, err
  949. }
  950. return b.write(buf)
  951. }
  952. func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
  953. buf := make([]byte, responseLengthSize+correlationIDSize)
  954. _, err := b.readFull(buf)
  955. if err != nil {
  956. return nil, err
  957. }
  958. header := responseHeader{}
  959. err = decode(buf, &header)
  960. if err != nil {
  961. return nil, err
  962. }
  963. if header.correlationID != correlationID {
  964. return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  965. }
  966. buf = make([]byte, header.length-correlationIDSize)
  967. _, err = b.readFull(buf)
  968. if err != nil {
  969. return nil, err
  970. }
  971. res := &SaslAuthenticateResponse{}
  972. if err := versionedDecode(buf, res, 0); err != nil {
  973. return nil, err
  974. }
  975. if res.Err != ErrNoError {
  976. return nil, res.Err
  977. }
  978. return res.SaslAuthBytes, nil
  979. }
  980. // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
  981. // https://tools.ietf.org/html/rfc7628
  982. func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
  983. var ext string
  984. if token.Extensions != nil && len(token.Extensions) > 0 {
  985. if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
  986. return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
  987. }
  988. ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
  989. }
  990. resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
  991. return resp, nil
  992. }
  993. // mapToString returns a list of key-value pairs ordered by key.
  994. // keyValSep separates the key from the value. elemSep separates each pair.
  995. func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
  996. buf := make([]string, 0, len(extensions))
  997. for k, v := range extensions {
  998. buf = append(buf, k+keyValSep+v)
  999. }
  1000. sort.Strings(buf)
  1001. return strings.Join(buf, elemSep)
  1002. }
  1003. func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
  1004. authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
  1005. rb := &SaslAuthenticateRequest{authBytes}
  1006. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  1007. buf, err := encode(req, b.conf.MetricRegistry)
  1008. if err != nil {
  1009. return 0, err
  1010. }
  1011. return b.write(buf)
  1012. }
  1013. func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
  1014. rb := &SaslAuthenticateRequest{initialResp}
  1015. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  1016. buf, err := encode(req, b.conf.MetricRegistry)
  1017. if err != nil {
  1018. return 0, err
  1019. }
  1020. return b.write(buf)
  1021. }
  1022. func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
  1023. buf := make([]byte, responseLengthSize+correlationIDSize)
  1024. bytesRead, err := b.readFull(buf)
  1025. if err != nil {
  1026. return bytesRead, err
  1027. }
  1028. header := responseHeader{}
  1029. err = decode(buf, &header)
  1030. if err != nil {
  1031. return bytesRead, err
  1032. }
  1033. if header.correlationID != correlationID {
  1034. return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  1035. }
  1036. buf = make([]byte, header.length-correlationIDSize)
  1037. c, err := b.readFull(buf)
  1038. bytesRead += c
  1039. if err != nil {
  1040. return bytesRead, err
  1041. }
  1042. if err := versionedDecode(buf, res, 0); err != nil {
  1043. return bytesRead, err
  1044. }
  1045. if res.Err != ErrNoError {
  1046. return bytesRead, res.Err
  1047. }
  1048. return bytesRead, nil
  1049. }
  1050. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  1051. b.updateRequestLatencyAndInFlightMetrics(requestLatency)
  1052. b.responseRate.Mark(1)
  1053. if b.brokerResponseRate != nil {
  1054. b.brokerResponseRate.Mark(1)
  1055. }
  1056. responseSize := int64(bytes)
  1057. b.incomingByteRate.Mark(responseSize)
  1058. if b.brokerIncomingByteRate != nil {
  1059. b.brokerIncomingByteRate.Mark(responseSize)
  1060. }
  1061. b.responseSize.Update(responseSize)
  1062. if b.brokerResponseSize != nil {
  1063. b.brokerResponseSize.Update(responseSize)
  1064. }
  1065. }
  1066. func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
  1067. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  1068. b.requestLatency.Update(requestLatencyInMs)
  1069. if b.brokerRequestLatency != nil {
  1070. b.brokerRequestLatency.Update(requestLatencyInMs)
  1071. }
  1072. b.addRequestInFlightMetrics(-1)
  1073. }
  1074. func (b *Broker) addRequestInFlightMetrics(i int64) {
  1075. b.requestsInFlight.Inc(i)
  1076. if b.brokerRequestsInFlight != nil {
  1077. b.brokerRequestsInFlight.Inc(i)
  1078. }
  1079. }
  1080. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  1081. b.requestRate.Mark(1)
  1082. if b.brokerRequestRate != nil {
  1083. b.brokerRequestRate.Mark(1)
  1084. }
  1085. requestSize := int64(bytes)
  1086. b.outgoingByteRate.Mark(requestSize)
  1087. if b.brokerOutgoingByteRate != nil {
  1088. b.brokerOutgoingByteRate.Mark(requestSize)
  1089. }
  1090. b.requestSize.Update(requestSize)
  1091. if b.brokerRequestSize != nil {
  1092. b.brokerRequestSize.Update(requestSize)
  1093. }
  1094. }
  1095. func (b *Broker) registerMetrics() {
  1096. b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
  1097. b.brokerRequestRate = b.registerMeter("request-rate")
  1098. b.brokerRequestSize = b.registerHistogram("request-size")
  1099. b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
  1100. b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
  1101. b.brokerResponseRate = b.registerMeter("response-rate")
  1102. b.brokerResponseSize = b.registerHistogram("response-size")
  1103. b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
  1104. }
  1105. func (b *Broker) unregisterMetrics() {
  1106. for _, name := range b.registeredMetrics {
  1107. b.conf.MetricRegistry.Unregister(name)
  1108. }
  1109. }
  1110. func (b *Broker) registerMeter(name string) metrics.Meter {
  1111. nameForBroker := getMetricNameForBroker(name, b)
  1112. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1113. return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
  1114. }
  1115. func (b *Broker) registerHistogram(name string) metrics.Histogram {
  1116. nameForBroker := getMetricNameForBroker(name, b)
  1117. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1118. return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
  1119. }
  1120. func (b *Broker) registerCounter(name string) metrics.Counter {
  1121. nameForBroker := getMetricNameForBroker(name, b)
  1122. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1123. return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
  1124. }