broker.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. metrics "github.com/rcrowley/go-metrics"
  15. )
  16. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  17. type Broker struct {
  18. conf *Config
  19. rack *string
  20. id int32
  21. addr string
  22. correlationID int32
  23. conn net.Conn
  24. connErr error
  25. lock sync.Mutex
  26. opened int32
  27. responses chan responsePromise
  28. done chan bool
  29. registeredMetrics []string
  30. incomingByteRate metrics.Meter
  31. requestRate metrics.Meter
  32. requestSize metrics.Histogram
  33. requestLatency metrics.Histogram
  34. outgoingByteRate metrics.Meter
  35. responseRate metrics.Meter
  36. responseSize metrics.Histogram
  37. brokerIncomingByteRate metrics.Meter
  38. brokerRequestRate metrics.Meter
  39. brokerRequestSize metrics.Histogram
  40. brokerRequestLatency metrics.Histogram
  41. brokerOutgoingByteRate metrics.Meter
  42. brokerResponseRate metrics.Meter
  43. brokerResponseSize metrics.Histogram
  44. kerberosAuthenticator GSSAPIKerberosAuth
  45. }
  46. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  47. type SASLMechanism string
  48. const (
  49. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  50. SASLTypeOAuth = "OAUTHBEARER"
  51. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  52. SASLTypePlaintext = "PLAIN"
  53. // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
  54. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
  55. // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
  56. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
  57. SASLTypeGSSAPI = "GSSAPI"
  58. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  59. // server negotiate SASL auth using opaque packets.
  60. SASLHandshakeV0 = int16(0)
  61. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  62. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  63. SASLHandshakeV1 = int16(1)
  64. // SASLExtKeyAuth is the reserved extension key name sent as part of the
  65. // SASL/OAUTHBEARER intial client response
  66. SASLExtKeyAuth = "auth"
  67. )
  68. // AccessToken contains an access token used to authenticate a
  69. // SASL/OAUTHBEARER client along with associated metadata.
  70. type AccessToken struct {
  71. // Token is the access token payload.
  72. Token string
  73. // Extensions is a optional map of arbitrary key-value pairs that can be
  74. // sent with the SASL/OAUTHBEARER initial client response. These values are
  75. // ignored by the SASL server if they are unexpected. This feature is only
  76. // supported by Kafka >= 2.1.0.
  77. Extensions map[string]string
  78. }
  79. // AccessTokenProvider is the interface that encapsulates how implementors
  80. // can generate access tokens for Kafka broker authentication.
  81. type AccessTokenProvider interface {
  82. // Token returns an access token. The implementation should ensure token
  83. // reuse so that multiple calls at connect time do not create multiple
  84. // tokens. The implementation should also periodically refresh the token in
  85. // order to guarantee that each call returns an unexpired token. This
  86. // method should not block indefinitely--a timeout error should be returned
  87. // after a short period of inactivity so that the broker connection logic
  88. // can log debugging information and retry.
  89. Token() (*AccessToken, error)
  90. }
  91. // SCRAMClient is a an interface to a SCRAM
  92. // client implementation.
  93. type SCRAMClient interface {
  94. // Begin prepares the client for the SCRAM exchange
  95. // with the server with a user name and a password
  96. Begin(userName, password, authzID string) error
  97. // Step steps client through the SCRAM exchange. It is
  98. // called repeatedly until it errors or `Done` returns true.
  99. Step(challenge string) (response string, err error)
  100. // Done should return true when the SCRAM conversation
  101. // is over.
  102. Done() bool
  103. }
  104. type responsePromise struct {
  105. requestTime time.Time
  106. correlationID int32
  107. packets chan []byte
  108. errors chan error
  109. }
  110. // NewBroker creates and returns a Broker targeting the given host:port address.
  111. // This does not attempt to actually connect, you have to call Open() for that.
  112. func NewBroker(addr string) *Broker {
  113. return &Broker{id: -1, addr: addr}
  114. }
  115. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  116. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  117. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  118. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  119. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  120. func (b *Broker) Open(conf *Config) error {
  121. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  122. return ErrAlreadyConnected
  123. }
  124. if conf == nil {
  125. conf = NewConfig()
  126. }
  127. err := conf.Validate()
  128. if err != nil {
  129. return err
  130. }
  131. b.lock.Lock()
  132. go withRecover(func() {
  133. defer b.lock.Unlock()
  134. dialer := net.Dialer{
  135. Timeout: conf.Net.DialTimeout,
  136. KeepAlive: conf.Net.KeepAlive,
  137. LocalAddr: conf.Net.LocalAddr,
  138. }
  139. if conf.Net.TLS.Enable {
  140. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  141. } else if conf.Net.Proxy.Enable {
  142. b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
  143. } else {
  144. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  145. }
  146. if b.connErr != nil {
  147. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  148. b.conn = nil
  149. atomic.StoreInt32(&b.opened, 0)
  150. return
  151. }
  152. b.conn = newBufConn(b.conn)
  153. b.conf = conf
  154. // Create or reuse the global metrics shared between brokers
  155. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  156. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  157. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  158. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  159. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  160. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  161. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  162. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  163. // the same id (-1) and are already exposed through the global metrics above
  164. if b.id >= 0 {
  165. b.registerMetrics()
  166. }
  167. if conf.Net.SASL.Enable {
  168. b.connErr = b.authenticateViaSASL()
  169. if b.connErr != nil {
  170. err = b.conn.Close()
  171. if err == nil {
  172. Logger.Printf("Closed connection to broker %s\n", b.addr)
  173. } else {
  174. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  175. }
  176. b.conn = nil
  177. atomic.StoreInt32(&b.opened, 0)
  178. return
  179. }
  180. }
  181. b.done = make(chan bool)
  182. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  183. if b.id >= 0 {
  184. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  185. } else {
  186. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  187. }
  188. go withRecover(b.responseReceiver)
  189. })
  190. return nil
  191. }
  192. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  193. // connected but it had tried to connect, the error from that connection attempt is also returned.
  194. func (b *Broker) Connected() (bool, error) {
  195. b.lock.Lock()
  196. defer b.lock.Unlock()
  197. return b.conn != nil, b.connErr
  198. }
  199. //Close closes the broker resources
  200. func (b *Broker) Close() error {
  201. b.lock.Lock()
  202. defer b.lock.Unlock()
  203. if b.conn == nil {
  204. return ErrNotConnected
  205. }
  206. close(b.responses)
  207. <-b.done
  208. err := b.conn.Close()
  209. b.conn = nil
  210. b.connErr = nil
  211. b.done = nil
  212. b.responses = nil
  213. b.unregisterMetrics()
  214. if err == nil {
  215. Logger.Printf("Closed connection to broker %s\n", b.addr)
  216. } else {
  217. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  218. }
  219. atomic.StoreInt32(&b.opened, 0)
  220. return err
  221. }
  222. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  223. func (b *Broker) ID() int32 {
  224. return b.id
  225. }
  226. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  227. func (b *Broker) Addr() string {
  228. return b.addr
  229. }
  230. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  231. // empty string if it is not known. The returned value corresponds to the
  232. // broker's broker.rack configuration setting. Requires protocol version to be
  233. // at least v0.10.0.0.
  234. func (b *Broker) Rack() string {
  235. if b.rack == nil {
  236. return ""
  237. }
  238. return *b.rack
  239. }
  240. //GetMetadata send a metadata request and returns a metadata response or error
  241. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  242. response := new(MetadataResponse)
  243. err := b.sendAndReceive(request, response)
  244. if err != nil {
  245. return nil, err
  246. }
  247. return response, nil
  248. }
  249. //GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
  250. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  251. response := new(ConsumerMetadataResponse)
  252. err := b.sendAndReceive(request, response)
  253. if err != nil {
  254. return nil, err
  255. }
  256. return response, nil
  257. }
  258. //FindCoordinator sends a find coordinate request and returns a response or error
  259. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  260. response := new(FindCoordinatorResponse)
  261. err := b.sendAndReceive(request, response)
  262. if err != nil {
  263. return nil, err
  264. }
  265. return response, nil
  266. }
  267. //GetAvailableOffsets return an offset response or error
  268. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  269. response := new(OffsetResponse)
  270. err := b.sendAndReceive(request, response)
  271. if err != nil {
  272. return nil, err
  273. }
  274. return response, nil
  275. }
  276. //Produce returns a produce response or error
  277. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  278. var (
  279. response *ProduceResponse
  280. err error
  281. )
  282. if request.RequiredAcks == NoResponse {
  283. err = b.sendAndReceive(request, nil)
  284. } else {
  285. response = new(ProduceResponse)
  286. err = b.sendAndReceive(request, response)
  287. }
  288. if err != nil {
  289. return nil, err
  290. }
  291. return response, nil
  292. }
  293. //Fetch returns a FetchResponse or error
  294. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  295. response := new(FetchResponse)
  296. err := b.sendAndReceive(request, response)
  297. if err != nil {
  298. return nil, err
  299. }
  300. return response, nil
  301. }
  302. //CommitOffset return an Offset commit reponse or error
  303. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  304. response := new(OffsetCommitResponse)
  305. err := b.sendAndReceive(request, response)
  306. if err != nil {
  307. return nil, err
  308. }
  309. return response, nil
  310. }
  311. //FetchOffset returns an offset fetch response or error
  312. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  313. response := new(OffsetFetchResponse)
  314. err := b.sendAndReceive(request, response)
  315. if err != nil {
  316. return nil, err
  317. }
  318. return response, nil
  319. }
  320. //JoinGroup returns a join group response or error
  321. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  322. response := new(JoinGroupResponse)
  323. err := b.sendAndReceive(request, response)
  324. if err != nil {
  325. return nil, err
  326. }
  327. return response, nil
  328. }
  329. //SyncGroup returns a sync group response or error
  330. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  331. response := new(SyncGroupResponse)
  332. err := b.sendAndReceive(request, response)
  333. if err != nil {
  334. return nil, err
  335. }
  336. return response, nil
  337. }
  338. //LeaveGroup return a leave group response or error
  339. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  340. response := new(LeaveGroupResponse)
  341. err := b.sendAndReceive(request, response)
  342. if err != nil {
  343. return nil, err
  344. }
  345. return response, nil
  346. }
  347. //Heartbeat returns a heartbeat response or error
  348. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  349. response := new(HeartbeatResponse)
  350. err := b.sendAndReceive(request, response)
  351. if err != nil {
  352. return nil, err
  353. }
  354. return response, nil
  355. }
  356. //ListGroups return a list group response or error
  357. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  358. response := new(ListGroupsResponse)
  359. err := b.sendAndReceive(request, response)
  360. if err != nil {
  361. return nil, err
  362. }
  363. return response, nil
  364. }
  365. //DescribeGroups return describe group response or error
  366. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  367. response := new(DescribeGroupsResponse)
  368. err := b.sendAndReceive(request, response)
  369. if err != nil {
  370. return nil, err
  371. }
  372. return response, nil
  373. }
  374. //ApiVersions return api version response or error
  375. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  376. response := new(ApiVersionsResponse)
  377. err := b.sendAndReceive(request, response)
  378. if err != nil {
  379. return nil, err
  380. }
  381. return response, nil
  382. }
  383. //CreateTopics send a create topic request and returns create topic response
  384. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  385. response := new(CreateTopicsResponse)
  386. err := b.sendAndReceive(request, response)
  387. if err != nil {
  388. return nil, err
  389. }
  390. return response, nil
  391. }
  392. //DeleteTopics sends a delete topic request and returns delete topic response
  393. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  394. response := new(DeleteTopicsResponse)
  395. err := b.sendAndReceive(request, response)
  396. if err != nil {
  397. return nil, err
  398. }
  399. return response, nil
  400. }
  401. //CreatePartitions sends a create partition request and returns create
  402. //partitions response or error
  403. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  404. response := new(CreatePartitionsResponse)
  405. err := b.sendAndReceive(request, response)
  406. if err != nil {
  407. return nil, err
  408. }
  409. return response, nil
  410. }
  411. //DeleteRecords send a request to delete records and return delete record
  412. //response or error
  413. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  414. response := new(DeleteRecordsResponse)
  415. err := b.sendAndReceive(request, response)
  416. if err != nil {
  417. return nil, err
  418. }
  419. return response, nil
  420. }
  421. //DescribeAcls sends a describe acl request and returns a response or error
  422. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  423. response := new(DescribeAclsResponse)
  424. err := b.sendAndReceive(request, response)
  425. if err != nil {
  426. return nil, err
  427. }
  428. return response, nil
  429. }
  430. //CreateAcls sends a create acl request and returns a response or error
  431. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  432. response := new(CreateAclsResponse)
  433. err := b.sendAndReceive(request, response)
  434. if err != nil {
  435. return nil, err
  436. }
  437. return response, nil
  438. }
  439. //DeleteAcls sends a delete acl request and returns a response or error
  440. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  441. response := new(DeleteAclsResponse)
  442. err := b.sendAndReceive(request, response)
  443. if err != nil {
  444. return nil, err
  445. }
  446. return response, nil
  447. }
  448. //InitProducerID sends an init producer request and returns a response or error
  449. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  450. response := new(InitProducerIDResponse)
  451. err := b.sendAndReceive(request, response)
  452. if err != nil {
  453. return nil, err
  454. }
  455. return response, nil
  456. }
  457. //AddPartitionsToTxn send a request to add partition to txn and returns
  458. //a response or error
  459. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  460. response := new(AddPartitionsToTxnResponse)
  461. err := b.sendAndReceive(request, response)
  462. if err != nil {
  463. return nil, err
  464. }
  465. return response, nil
  466. }
  467. //AddOffsetsToTxn sends a request to add offsets to txn and returns a response
  468. //or error
  469. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  470. response := new(AddOffsetsToTxnResponse)
  471. err := b.sendAndReceive(request, response)
  472. if err != nil {
  473. return nil, err
  474. }
  475. return response, nil
  476. }
  477. //EndTxn sends a request to end txn and returns a response or error
  478. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  479. response := new(EndTxnResponse)
  480. err := b.sendAndReceive(request, response)
  481. if err != nil {
  482. return nil, err
  483. }
  484. return response, nil
  485. }
  486. //TxnOffsetCommit sends a request to commit transaction offsets and returns
  487. //a response or error
  488. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  489. response := new(TxnOffsetCommitResponse)
  490. err := b.sendAndReceive(request, response)
  491. if err != nil {
  492. return nil, err
  493. }
  494. return response, nil
  495. }
  496. //DescribeConfigs sends a request to describe config and returns a response or
  497. //error
  498. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  499. response := new(DescribeConfigsResponse)
  500. err := b.sendAndReceive(request, response)
  501. if err != nil {
  502. return nil, err
  503. }
  504. return response, nil
  505. }
  506. //AlterConfigs sends a request to alter config and return a response or error
  507. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  508. response := new(AlterConfigsResponse)
  509. err := b.sendAndReceive(request, response)
  510. if err != nil {
  511. return nil, err
  512. }
  513. return response, nil
  514. }
  515. //DeleteGroups sends a request to delete groups and returns a response or error
  516. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  517. response := new(DeleteGroupsResponse)
  518. if err := b.sendAndReceive(request, response); err != nil {
  519. return nil, err
  520. }
  521. return response, nil
  522. }
  523. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  524. b.lock.Lock()
  525. defer b.lock.Unlock()
  526. if b.conn == nil {
  527. if b.connErr != nil {
  528. return nil, b.connErr
  529. }
  530. return nil, ErrNotConnected
  531. }
  532. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  533. return nil, ErrUnsupportedVersion
  534. }
  535. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  536. buf, err := encode(req, b.conf.MetricRegistry)
  537. if err != nil {
  538. return nil, err
  539. }
  540. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  541. if err != nil {
  542. return nil, err
  543. }
  544. requestTime := time.Now()
  545. bytes, err := b.conn.Write(buf)
  546. b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
  547. if err != nil {
  548. return nil, err
  549. }
  550. b.correlationID++
  551. if !promiseResponse {
  552. // Record request latency without the response
  553. b.updateRequestLatencyMetrics(time.Since(requestTime))
  554. return nil, nil
  555. }
  556. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  557. b.responses <- promise
  558. return &promise, nil
  559. }
  560. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  561. promise, err := b.send(req, res != nil)
  562. if err != nil {
  563. return err
  564. }
  565. if promise == nil {
  566. return nil
  567. }
  568. select {
  569. case buf := <-promise.packets:
  570. return versionedDecode(buf, res, req.version())
  571. case err = <-promise.errors:
  572. return err
  573. }
  574. }
  575. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  576. b.id, err = pd.getInt32()
  577. if err != nil {
  578. return err
  579. }
  580. host, err := pd.getString()
  581. if err != nil {
  582. return err
  583. }
  584. port, err := pd.getInt32()
  585. if err != nil {
  586. return err
  587. }
  588. if version >= 1 {
  589. b.rack, err = pd.getNullableString()
  590. if err != nil {
  591. return err
  592. }
  593. }
  594. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  595. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  596. return err
  597. }
  598. return nil
  599. }
  600. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  601. host, portstr, err := net.SplitHostPort(b.addr)
  602. if err != nil {
  603. return err
  604. }
  605. port, err := strconv.Atoi(portstr)
  606. if err != nil {
  607. return err
  608. }
  609. pe.putInt32(b.id)
  610. err = pe.putString(host)
  611. if err != nil {
  612. return err
  613. }
  614. pe.putInt32(int32(port))
  615. if version >= 1 {
  616. err = pe.putNullableString(b.rack)
  617. if err != nil {
  618. return err
  619. }
  620. }
  621. return nil
  622. }
  623. func (b *Broker) responseReceiver() {
  624. var dead error
  625. header := make([]byte, 8)
  626. for response := range b.responses {
  627. if dead != nil {
  628. response.errors <- dead
  629. continue
  630. }
  631. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  632. if err != nil {
  633. dead = err
  634. response.errors <- err
  635. continue
  636. }
  637. bytesReadHeader, err := io.ReadFull(b.conn, header)
  638. requestLatency := time.Since(response.requestTime)
  639. if err != nil {
  640. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  641. dead = err
  642. response.errors <- err
  643. continue
  644. }
  645. decodedHeader := responseHeader{}
  646. err = decode(header, &decodedHeader)
  647. if err != nil {
  648. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  649. dead = err
  650. response.errors <- err
  651. continue
  652. }
  653. if decodedHeader.correlationID != response.correlationID {
  654. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  655. // TODO if decoded ID < cur ID, discard until we catch up
  656. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  657. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  658. response.errors <- dead
  659. continue
  660. }
  661. buf := make([]byte, decodedHeader.length-4)
  662. bytesReadBody, err := io.ReadFull(b.conn, buf)
  663. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  664. if err != nil {
  665. dead = err
  666. response.errors <- err
  667. continue
  668. }
  669. response.packets <- buf
  670. }
  671. close(b.done)
  672. }
  673. func (b *Broker) authenticateViaSASL() error {
  674. switch b.conf.Net.SASL.Mechanism {
  675. case SASLTypeOAuth:
  676. return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
  677. case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
  678. return b.sendAndReceiveSASLSCRAMv1()
  679. case SASLTypeGSSAPI:
  680. return b.sendAndReceiveKerberos()
  681. default:
  682. return b.sendAndReceiveSASLPlainAuth()
  683. }
  684. }
  685. func (b *Broker) sendAndReceiveKerberos() error {
  686. b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
  687. if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
  688. b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
  689. }
  690. return b.kerberosAuthenticator.Authorize(b)
  691. }
  692. func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
  693. rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
  694. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  695. buf, err := encode(req, b.conf.MetricRegistry)
  696. if err != nil {
  697. return err
  698. }
  699. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  700. if err != nil {
  701. return err
  702. }
  703. requestTime := time.Now()
  704. bytes, err := b.conn.Write(buf)
  705. b.updateOutgoingCommunicationMetrics(bytes)
  706. if err != nil {
  707. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  708. return err
  709. }
  710. b.correlationID++
  711. //wait for the response
  712. header := make([]byte, 8) // response header
  713. _, err = io.ReadFull(b.conn, header)
  714. if err != nil {
  715. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  716. return err
  717. }
  718. length := binary.BigEndian.Uint32(header[:4])
  719. payload := make([]byte, length-4)
  720. n, err := io.ReadFull(b.conn, payload)
  721. if err != nil {
  722. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  723. return err
  724. }
  725. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  726. res := &SaslHandshakeResponse{}
  727. err = versionedDecode(payload, res, 0)
  728. if err != nil {
  729. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  730. return err
  731. }
  732. if res.Err != ErrNoError {
  733. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  734. return res.Err
  735. }
  736. Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
  737. return nil
  738. }
  739. // Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
  740. // Kafka 1.x.x onward added a SaslAuthenticate request/response message which
  741. // wraps the SASL flow in the Kafka protocol, which allows for returning
  742. // meaningful errors on authentication failure.
  743. //
  744. // In SASL Plain, Kafka expects the auth header to be in the following format
  745. // Message format (from https://tools.ietf.org/html/rfc4616):
  746. //
  747. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  748. // authcid = 1*SAFE ; MUST accept up to 255 octets
  749. // authzid = 1*SAFE ; MUST accept up to 255 octets
  750. // passwd = 1*SAFE ; MUST accept up to 255 octets
  751. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  752. //
  753. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  754. // ;; any UTF-8 encoded Unicode character except NUL
  755. //
  756. // With SASL v0 handshake and auth then:
  757. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  758. // When credentials are invalid, Kafka closes the connection.
  759. //
  760. // With SASL v1 handshake and auth then:
  761. // When credentials are invalid, Kafka replies with a SaslAuthenticate response
  762. // containing an error code and message detailing the authentication failure.
  763. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  764. // default to V0 to allow for backward compatability when SASL is enabled
  765. // but not the handshake
  766. if b.conf.Net.SASL.Handshake {
  767. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
  768. if handshakeErr != nil {
  769. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  770. return handshakeErr
  771. }
  772. }
  773. if b.conf.Net.SASL.Version == SASLHandshakeV1 {
  774. return b.sendAndReceiveV1SASLPlainAuth()
  775. }
  776. return b.sendAndReceiveV0SASLPlainAuth()
  777. }
  778. // sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
  779. func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
  780. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  781. authBytes := make([]byte, length+4) //4 byte length header + auth data
  782. binary.BigEndian.PutUint32(authBytes, uint32(length))
  783. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  784. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  785. if err != nil {
  786. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  787. return err
  788. }
  789. requestTime := time.Now()
  790. bytesWritten, err := b.conn.Write(authBytes)
  791. b.updateOutgoingCommunicationMetrics(bytesWritten)
  792. if err != nil {
  793. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  794. return err
  795. }
  796. header := make([]byte, 4)
  797. n, err := io.ReadFull(b.conn, header)
  798. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  799. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  800. // Otherwise, the broker closes the connection and we get an EOF
  801. if err != nil {
  802. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  803. return err
  804. }
  805. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  806. return nil
  807. }
  808. // sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
  809. func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
  810. correlationID := b.correlationID
  811. requestTime := time.Now()
  812. bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
  813. b.updateOutgoingCommunicationMetrics(bytesWritten)
  814. if err != nil {
  815. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  816. return err
  817. }
  818. b.correlationID++
  819. bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
  820. b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
  821. // With v1 sasl we get an error message set in the response we can return
  822. if err != nil {
  823. Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
  824. return err
  825. }
  826. return nil
  827. }
  828. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  829. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  830. func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
  831. if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
  832. return err
  833. }
  834. token, err := provider.Token()
  835. if err != nil {
  836. return err
  837. }
  838. message, err := buildClientFirstMessage(token)
  839. if err != nil {
  840. return err
  841. }
  842. challenged, err := b.sendClientMessage(message)
  843. if err != nil {
  844. return err
  845. }
  846. if challenged {
  847. // Abort the token exchange. The broker returns the failure code.
  848. _, err = b.sendClientMessage([]byte(`\x01`))
  849. }
  850. return err
  851. }
  852. // sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
  853. // if the broker responds with a challenge, in which case the token is
  854. // rejected.
  855. func (b *Broker) sendClientMessage(message []byte) (bool, error) {
  856. requestTime := time.Now()
  857. correlationID := b.correlationID
  858. bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
  859. if err != nil {
  860. return false, err
  861. }
  862. b.updateOutgoingCommunicationMetrics(bytesWritten)
  863. b.correlationID++
  864. res := &SaslAuthenticateResponse{}
  865. bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
  866. requestLatency := time.Since(requestTime)
  867. b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  868. isChallenge := len(res.SaslAuthBytes) > 0
  869. if isChallenge && err != nil {
  870. Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
  871. }
  872. return isChallenge, err
  873. }
  874. func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
  875. if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
  876. return err
  877. }
  878. scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
  879. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  880. return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
  881. }
  882. msg, err := scramClient.Step("")
  883. if err != nil {
  884. return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
  885. }
  886. for !scramClient.Done() {
  887. requestTime := time.Now()
  888. correlationID := b.correlationID
  889. bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
  890. if err != nil {
  891. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  892. return err
  893. }
  894. b.updateOutgoingCommunicationMetrics(bytesWritten)
  895. b.correlationID++
  896. challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
  897. if err != nil {
  898. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  899. return err
  900. }
  901. b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
  902. msg, err = scramClient.Step(string(challenge))
  903. if err != nil {
  904. Logger.Println("SASL authentication failed", err)
  905. return err
  906. }
  907. }
  908. Logger.Println("SASL authentication succeeded")
  909. return nil
  910. }
  911. func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
  912. rb := &SaslAuthenticateRequest{msg}
  913. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  914. buf, err := encode(req, b.conf.MetricRegistry)
  915. if err != nil {
  916. return 0, err
  917. }
  918. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  919. return 0, err
  920. }
  921. return b.conn.Write(buf)
  922. }
  923. func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
  924. buf := make([]byte, responseLengthSize+correlationIDSize)
  925. _, err := io.ReadFull(b.conn, buf)
  926. if err != nil {
  927. return nil, err
  928. }
  929. header := responseHeader{}
  930. err = decode(buf, &header)
  931. if err != nil {
  932. return nil, err
  933. }
  934. if header.correlationID != correlationID {
  935. return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  936. }
  937. buf = make([]byte, header.length-correlationIDSize)
  938. _, err = io.ReadFull(b.conn, buf)
  939. if err != nil {
  940. return nil, err
  941. }
  942. res := &SaslAuthenticateResponse{}
  943. if err := versionedDecode(buf, res, 0); err != nil {
  944. return nil, err
  945. }
  946. if res.Err != ErrNoError {
  947. return nil, res.Err
  948. }
  949. return res.SaslAuthBytes, nil
  950. }
  951. // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
  952. // https://tools.ietf.org/html/rfc7628
  953. func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
  954. var ext string
  955. if token.Extensions != nil && len(token.Extensions) > 0 {
  956. if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
  957. return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
  958. }
  959. ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
  960. }
  961. resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
  962. return resp, nil
  963. }
  964. // mapToString returns a list of key-value pairs ordered by key.
  965. // keyValSep separates the key from the value. elemSep separates each pair.
  966. func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
  967. buf := make([]string, 0, len(extensions))
  968. for k, v := range extensions {
  969. buf = append(buf, k+keyValSep+v)
  970. }
  971. sort.Strings(buf)
  972. return strings.Join(buf, elemSep)
  973. }
  974. func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
  975. authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
  976. rb := &SaslAuthenticateRequest{authBytes}
  977. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  978. buf, err := encode(req, b.conf.MetricRegistry)
  979. if err != nil {
  980. return 0, err
  981. }
  982. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  983. if err != nil {
  984. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  985. return 0, err
  986. }
  987. return b.conn.Write(buf)
  988. }
  989. func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
  990. rb := &SaslAuthenticateRequest{initialResp}
  991. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  992. buf, err := encode(req, b.conf.MetricRegistry)
  993. if err != nil {
  994. return 0, err
  995. }
  996. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  997. return 0, err
  998. }
  999. return b.conn.Write(buf)
  1000. }
  1001. func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
  1002. buf := make([]byte, responseLengthSize+correlationIDSize)
  1003. bytesRead, err := io.ReadFull(b.conn, buf)
  1004. if err != nil {
  1005. return bytesRead, err
  1006. }
  1007. header := responseHeader{}
  1008. err = decode(buf, &header)
  1009. if err != nil {
  1010. return bytesRead, err
  1011. }
  1012. if header.correlationID != correlationID {
  1013. return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  1014. }
  1015. buf = make([]byte, header.length-correlationIDSize)
  1016. c, err := io.ReadFull(b.conn, buf)
  1017. bytesRead += c
  1018. if err != nil {
  1019. return bytesRead, err
  1020. }
  1021. if err := versionedDecode(buf, res, 0); err != nil {
  1022. return bytesRead, err
  1023. }
  1024. if res.Err != ErrNoError {
  1025. return bytesRead, res.Err
  1026. }
  1027. return bytesRead, nil
  1028. }
  1029. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  1030. b.updateRequestLatencyMetrics(requestLatency)
  1031. b.responseRate.Mark(1)
  1032. if b.brokerResponseRate != nil {
  1033. b.brokerResponseRate.Mark(1)
  1034. }
  1035. responseSize := int64(bytes)
  1036. b.incomingByteRate.Mark(responseSize)
  1037. if b.brokerIncomingByteRate != nil {
  1038. b.brokerIncomingByteRate.Mark(responseSize)
  1039. }
  1040. b.responseSize.Update(responseSize)
  1041. if b.brokerResponseSize != nil {
  1042. b.brokerResponseSize.Update(responseSize)
  1043. }
  1044. }
  1045. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  1046. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  1047. b.requestLatency.Update(requestLatencyInMs)
  1048. if b.brokerRequestLatency != nil {
  1049. b.brokerRequestLatency.Update(requestLatencyInMs)
  1050. }
  1051. }
  1052. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  1053. b.requestRate.Mark(1)
  1054. if b.brokerRequestRate != nil {
  1055. b.brokerRequestRate.Mark(1)
  1056. }
  1057. requestSize := int64(bytes)
  1058. b.outgoingByteRate.Mark(requestSize)
  1059. if b.brokerOutgoingByteRate != nil {
  1060. b.brokerOutgoingByteRate.Mark(requestSize)
  1061. }
  1062. b.requestSize.Update(requestSize)
  1063. if b.brokerRequestSize != nil {
  1064. b.brokerRequestSize.Update(requestSize)
  1065. }
  1066. }
  1067. func (b *Broker) registerMetrics() {
  1068. b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
  1069. b.brokerRequestRate = b.registerMeter("request-rate")
  1070. b.brokerRequestSize = b.registerHistogram("request-size")
  1071. b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
  1072. b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
  1073. b.brokerResponseRate = b.registerMeter("response-rate")
  1074. b.brokerResponseSize = b.registerHistogram("response-size")
  1075. }
  1076. func (b *Broker) unregisterMetrics() {
  1077. for _, name := range b.registeredMetrics {
  1078. b.conf.MetricRegistry.Unregister(name)
  1079. }
  1080. }
  1081. func (b *Broker) registerMeter(name string) metrics.Meter {
  1082. nameForBroker := getMetricNameForBroker(name, b)
  1083. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1084. return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
  1085. }
  1086. func (b *Broker) registerHistogram(name string) metrics.Histogram {
  1087. nameForBroker := getMetricNameForBroker(name, b)
  1088. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1089. return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
  1090. }