broker.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. metrics "github.com/rcrowley/go-metrics"
  15. )
  16. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  17. type Broker struct {
  18. conf *Config
  19. rack *string
  20. id int32
  21. addr string
  22. correlationID int32
  23. conn net.Conn
  24. connErr error
  25. lock sync.Mutex
  26. opened int32
  27. responses chan responsePromise
  28. done chan bool
  29. registeredMetrics []string
  30. incomingByteRate metrics.Meter
  31. requestRate metrics.Meter
  32. requestSize metrics.Histogram
  33. requestLatency metrics.Histogram
  34. outgoingByteRate metrics.Meter
  35. responseRate metrics.Meter
  36. responseSize metrics.Histogram
  37. brokerIncomingByteRate metrics.Meter
  38. brokerRequestRate metrics.Meter
  39. brokerRequestSize metrics.Histogram
  40. brokerRequestLatency metrics.Histogram
  41. brokerOutgoingByteRate metrics.Meter
  42. brokerResponseRate metrics.Meter
  43. brokerResponseSize metrics.Histogram
  44. }
  45. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  46. type SASLMechanism string
  47. const (
  48. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  49. SASLTypeOAuth = "OAUTHBEARER"
  50. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  51. SASLTypePlaintext = "PLAIN"
  52. // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
  53. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
  54. // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
  55. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
  56. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  57. // server negotiate SASL auth using opaque packets.
  58. SASLHandshakeV0 = int16(0)
  59. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  60. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  61. SASLHandshakeV1 = int16(1)
  62. // SASLExtKeyAuth is the reserved extension key name sent as part of the
  63. // SASL/OAUTHBEARER intial client response
  64. SASLExtKeyAuth = "auth"
  65. )
  66. // AccessToken contains an access token used to authenticate a
  67. // SASL/OAUTHBEARER client along with associated metadata.
  68. type AccessToken struct {
  69. // Token is the access token payload.
  70. Token string
  71. // Extensions is a optional map of arbitrary key-value pairs that can be
  72. // sent with the SASL/OAUTHBEARER initial client response. These values are
  73. // ignored by the SASL server if they are unexpected. This feature is only
  74. // supported by Kafka >= 2.1.0.
  75. Extensions map[string]string
  76. }
  77. // AccessTokenProvider is the interface that encapsulates how implementors
  78. // can generate access tokens for Kafka broker authentication.
  79. type AccessTokenProvider interface {
  80. // Token returns an access token. The implementation should ensure token
  81. // reuse so that multiple calls at connect time do not create multiple
  82. // tokens. The implementation should also periodically refresh the token in
  83. // order to guarantee that each call returns an unexpired token. This
  84. // method should not block indefinitely--a timeout error should be returned
  85. // after a short period of inactivity so that the broker connection logic
  86. // can log debugging information and retry.
  87. Token() (*AccessToken, error)
  88. }
  89. // SCRAMClient is a an interface to a SCRAM
  90. // client implementation.
  91. type SCRAMClient interface {
  92. // Begin prepares the client for the SCRAM exchange
  93. // with the server with a user name and a password
  94. Begin(userName, password, authzID string) error
  95. // Step steps client through the SCRAM exchange. It is
  96. // called repeatedly until it errors or `Done` returns true.
  97. Step(challenge string) (response string, err error)
  98. // Done should return true when the SCRAM conversation
  99. // is over.
  100. Done() bool
  101. }
  102. type responsePromise struct {
  103. requestTime time.Time
  104. correlationID int32
  105. packets chan []byte
  106. errors chan error
  107. }
  108. // NewBroker creates and returns a Broker targeting the given host:port address.
  109. // This does not attempt to actually connect, you have to call Open() for that.
  110. func NewBroker(addr string) *Broker {
  111. return &Broker{id: -1, addr: addr}
  112. }
  113. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  114. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  115. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  116. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  117. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  118. func (b *Broker) Open(conf *Config) error {
  119. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  120. return ErrAlreadyConnected
  121. }
  122. if conf == nil {
  123. conf = NewConfig()
  124. }
  125. err := conf.Validate()
  126. if err != nil {
  127. return err
  128. }
  129. b.lock.Lock()
  130. go withRecover(func() {
  131. defer b.lock.Unlock()
  132. dialer := net.Dialer{
  133. Timeout: conf.Net.DialTimeout,
  134. KeepAlive: conf.Net.KeepAlive,
  135. LocalAddr: conf.Net.LocalAddr,
  136. }
  137. if conf.Net.TLS.Enable {
  138. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  139. } else if conf.Net.Proxy.Enable {
  140. b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
  141. } else {
  142. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  143. }
  144. if b.connErr != nil {
  145. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  146. b.conn = nil
  147. atomic.StoreInt32(&b.opened, 0)
  148. return
  149. }
  150. b.conn = newBufConn(b.conn)
  151. b.conf = conf
  152. // Create or reuse the global metrics shared between brokers
  153. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  154. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  155. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  156. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  157. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  158. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  159. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  160. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  161. // the same id (-1) and are already exposed through the global metrics above
  162. if b.id >= 0 {
  163. b.registerMetrics()
  164. }
  165. if conf.Net.SASL.Enable {
  166. b.connErr = b.authenticateViaSASL()
  167. if b.connErr != nil {
  168. err = b.conn.Close()
  169. if err == nil {
  170. Logger.Printf("Closed connection to broker %s\n", b.addr)
  171. } else {
  172. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  173. }
  174. b.conn = nil
  175. atomic.StoreInt32(&b.opened, 0)
  176. return
  177. }
  178. }
  179. b.done = make(chan bool)
  180. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  181. if b.id >= 0 {
  182. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  183. } else {
  184. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  185. }
  186. go withRecover(b.responseReceiver)
  187. })
  188. return nil
  189. }
  190. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  191. // connected but it had tried to connect, the error from that connection attempt is also returned.
  192. func (b *Broker) Connected() (bool, error) {
  193. b.lock.Lock()
  194. defer b.lock.Unlock()
  195. return b.conn != nil, b.connErr
  196. }
  197. //Close closes the broker resources
  198. func (b *Broker) Close() error {
  199. b.lock.Lock()
  200. defer b.lock.Unlock()
  201. if b.conn == nil {
  202. return ErrNotConnected
  203. }
  204. close(b.responses)
  205. <-b.done
  206. err := b.conn.Close()
  207. b.conn = nil
  208. b.connErr = nil
  209. b.done = nil
  210. b.responses = nil
  211. b.unregisterMetrics()
  212. if err == nil {
  213. Logger.Printf("Closed connection to broker %s\n", b.addr)
  214. } else {
  215. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  216. }
  217. atomic.StoreInt32(&b.opened, 0)
  218. return err
  219. }
  220. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  221. func (b *Broker) ID() int32 {
  222. return b.id
  223. }
  224. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  225. func (b *Broker) Addr() string {
  226. return b.addr
  227. }
  228. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  229. // empty string if it is not known. The returned value corresponds to the
  230. // broker's broker.rack configuration setting. Requires protocol version to be
  231. // at least v0.10.0.0.
  232. func (b *Broker) Rack() string {
  233. if b.rack == nil {
  234. return ""
  235. }
  236. return *b.rack
  237. }
  238. //GetMetadata send a metadata request and returns a metadata response or error
  239. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  240. response := new(MetadataResponse)
  241. err := b.sendAndReceive(request, response)
  242. if err != nil {
  243. return nil, err
  244. }
  245. return response, nil
  246. }
  247. //GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
  248. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  249. response := new(ConsumerMetadataResponse)
  250. err := b.sendAndReceive(request, response)
  251. if err != nil {
  252. return nil, err
  253. }
  254. return response, nil
  255. }
  256. //FindCoordinator sends a find coordinate request and returns a response or error
  257. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  258. response := new(FindCoordinatorResponse)
  259. err := b.sendAndReceive(request, response)
  260. if err != nil {
  261. return nil, err
  262. }
  263. return response, nil
  264. }
  265. //GetAvailableOffsets return an offset response or error
  266. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  267. response := new(OffsetResponse)
  268. err := b.sendAndReceive(request, response)
  269. if err != nil {
  270. return nil, err
  271. }
  272. return response, nil
  273. }
  274. //Produce returns a produce response or error
  275. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  276. var (
  277. response *ProduceResponse
  278. err error
  279. )
  280. if request.RequiredAcks == NoResponse {
  281. err = b.sendAndReceive(request, nil)
  282. } else {
  283. response = new(ProduceResponse)
  284. err = b.sendAndReceive(request, response)
  285. }
  286. if err != nil {
  287. return nil, err
  288. }
  289. return response, nil
  290. }
  291. //Fetch returns a FetchResponse or error
  292. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  293. response := new(FetchResponse)
  294. err := b.sendAndReceive(request, response)
  295. if err != nil {
  296. return nil, err
  297. }
  298. return response, nil
  299. }
  300. //CommitOffset return an Offset commit reponse or error
  301. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  302. response := new(OffsetCommitResponse)
  303. err := b.sendAndReceive(request, response)
  304. if err != nil {
  305. return nil, err
  306. }
  307. return response, nil
  308. }
  309. //FetchOffset returns an offset fetch response or error
  310. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  311. response := new(OffsetFetchResponse)
  312. err := b.sendAndReceive(request, response)
  313. if err != nil {
  314. return nil, err
  315. }
  316. return response, nil
  317. }
  318. //JoinGroup returns a join group response or error
  319. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  320. response := new(JoinGroupResponse)
  321. err := b.sendAndReceive(request, response)
  322. if err != nil {
  323. return nil, err
  324. }
  325. return response, nil
  326. }
  327. //SyncGroup returns a sync group response or error
  328. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  329. response := new(SyncGroupResponse)
  330. err := b.sendAndReceive(request, response)
  331. if err != nil {
  332. return nil, err
  333. }
  334. return response, nil
  335. }
  336. //LeaveGroup return a leave group response or error
  337. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  338. response := new(LeaveGroupResponse)
  339. err := b.sendAndReceive(request, response)
  340. if err != nil {
  341. return nil, err
  342. }
  343. return response, nil
  344. }
  345. //Heartbeat returns a heartbeat response or error
  346. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  347. response := new(HeartbeatResponse)
  348. err := b.sendAndReceive(request, response)
  349. if err != nil {
  350. return nil, err
  351. }
  352. return response, nil
  353. }
  354. //ListGroups return a list group response or error
  355. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  356. response := new(ListGroupsResponse)
  357. err := b.sendAndReceive(request, response)
  358. if err != nil {
  359. return nil, err
  360. }
  361. return response, nil
  362. }
  363. //DescribeGroups return describe group response or error
  364. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  365. response := new(DescribeGroupsResponse)
  366. err := b.sendAndReceive(request, response)
  367. if err != nil {
  368. return nil, err
  369. }
  370. return response, nil
  371. }
  372. //ApiVersions return api version response or error
  373. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  374. response := new(ApiVersionsResponse)
  375. err := b.sendAndReceive(request, response)
  376. if err != nil {
  377. return nil, err
  378. }
  379. return response, nil
  380. }
  381. //CreateTopics send a create topic request and returns create topic response
  382. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  383. response := new(CreateTopicsResponse)
  384. err := b.sendAndReceive(request, response)
  385. if err != nil {
  386. return nil, err
  387. }
  388. return response, nil
  389. }
  390. //DeleteTopics sends a delete topic request and returns delete topic response
  391. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  392. response := new(DeleteTopicsResponse)
  393. err := b.sendAndReceive(request, response)
  394. if err != nil {
  395. return nil, err
  396. }
  397. return response, nil
  398. }
  399. //CreatePartitions sends a create partition request and returns create
  400. //partitions response or error
  401. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  402. response := new(CreatePartitionsResponse)
  403. err := b.sendAndReceive(request, response)
  404. if err != nil {
  405. return nil, err
  406. }
  407. return response, nil
  408. }
  409. //DeleteRecords send a request to delete records and return delete record
  410. //response or error
  411. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  412. response := new(DeleteRecordsResponse)
  413. err := b.sendAndReceive(request, response)
  414. if err != nil {
  415. return nil, err
  416. }
  417. return response, nil
  418. }
  419. //DescribeAcls sends a describe acl request and returns a response or error
  420. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  421. response := new(DescribeAclsResponse)
  422. err := b.sendAndReceive(request, response)
  423. if err != nil {
  424. return nil, err
  425. }
  426. return response, nil
  427. }
  428. //CreateAcls sends a create acl request and returns a response or error
  429. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  430. response := new(CreateAclsResponse)
  431. err := b.sendAndReceive(request, response)
  432. if err != nil {
  433. return nil, err
  434. }
  435. return response, nil
  436. }
  437. //DeleteAcls sends a delete acl request and returns a response or error
  438. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  439. response := new(DeleteAclsResponse)
  440. err := b.sendAndReceive(request, response)
  441. if err != nil {
  442. return nil, err
  443. }
  444. return response, nil
  445. }
  446. //InitProducerID sends an init producer request and returns a response or error
  447. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  448. response := new(InitProducerIDResponse)
  449. err := b.sendAndReceive(request, response)
  450. if err != nil {
  451. return nil, err
  452. }
  453. return response, nil
  454. }
  455. //AddPartitionsToTxn send a request to add partition to txn and returns
  456. //a response or error
  457. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  458. response := new(AddPartitionsToTxnResponse)
  459. err := b.sendAndReceive(request, response)
  460. if err != nil {
  461. return nil, err
  462. }
  463. return response, nil
  464. }
  465. //AddOffsetsToTxn sends a request to add offsets to txn and returns a response
  466. //or error
  467. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  468. response := new(AddOffsetsToTxnResponse)
  469. err := b.sendAndReceive(request, response)
  470. if err != nil {
  471. return nil, err
  472. }
  473. return response, nil
  474. }
  475. //EndTxn sends a request to end txn and returns a response or error
  476. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  477. response := new(EndTxnResponse)
  478. err := b.sendAndReceive(request, response)
  479. if err != nil {
  480. return nil, err
  481. }
  482. return response, nil
  483. }
  484. //TxnOffsetCommit sends a request to commit transaction offsets and returns
  485. //a response or error
  486. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  487. response := new(TxnOffsetCommitResponse)
  488. err := b.sendAndReceive(request, response)
  489. if err != nil {
  490. return nil, err
  491. }
  492. return response, nil
  493. }
  494. //DescribeConfigs sends a request to describe config and returns a response or
  495. //error
  496. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  497. response := new(DescribeConfigsResponse)
  498. err := b.sendAndReceive(request, response)
  499. if err != nil {
  500. return nil, err
  501. }
  502. return response, nil
  503. }
  504. //AlterConfigs sends a request to alter config and return a response or error
  505. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  506. response := new(AlterConfigsResponse)
  507. err := b.sendAndReceive(request, response)
  508. if err != nil {
  509. return nil, err
  510. }
  511. return response, nil
  512. }
  513. //DeleteGroups sends a request to delete groups and returns a response or error
  514. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  515. response := new(DeleteGroupsResponse)
  516. if err := b.sendAndReceive(request, response); err != nil {
  517. return nil, err
  518. }
  519. return response, nil
  520. }
  521. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  522. b.lock.Lock()
  523. defer b.lock.Unlock()
  524. if b.conn == nil {
  525. if b.connErr != nil {
  526. return nil, b.connErr
  527. }
  528. return nil, ErrNotConnected
  529. }
  530. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  531. return nil, ErrUnsupportedVersion
  532. }
  533. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  534. buf, err := encode(req, b.conf.MetricRegistry)
  535. if err != nil {
  536. return nil, err
  537. }
  538. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  539. if err != nil {
  540. return nil, err
  541. }
  542. requestTime := time.Now()
  543. bytes, err := b.conn.Write(buf)
  544. b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
  545. if err != nil {
  546. return nil, err
  547. }
  548. b.correlationID++
  549. if !promiseResponse {
  550. // Record request latency without the response
  551. b.updateRequestLatencyMetrics(time.Since(requestTime))
  552. return nil, nil
  553. }
  554. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  555. b.responses <- promise
  556. return &promise, nil
  557. }
  558. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  559. promise, err := b.send(req, res != nil)
  560. if err != nil {
  561. return err
  562. }
  563. if promise == nil {
  564. return nil
  565. }
  566. select {
  567. case buf := <-promise.packets:
  568. return versionedDecode(buf, res, req.version())
  569. case err = <-promise.errors:
  570. return err
  571. }
  572. }
  573. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  574. b.id, err = pd.getInt32()
  575. if err != nil {
  576. return err
  577. }
  578. host, err := pd.getString()
  579. if err != nil {
  580. return err
  581. }
  582. port, err := pd.getInt32()
  583. if err != nil {
  584. return err
  585. }
  586. if version >= 1 {
  587. b.rack, err = pd.getNullableString()
  588. if err != nil {
  589. return err
  590. }
  591. }
  592. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  593. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  594. return err
  595. }
  596. return nil
  597. }
  598. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  599. host, portstr, err := net.SplitHostPort(b.addr)
  600. if err != nil {
  601. return err
  602. }
  603. port, err := strconv.Atoi(portstr)
  604. if err != nil {
  605. return err
  606. }
  607. pe.putInt32(b.id)
  608. err = pe.putString(host)
  609. if err != nil {
  610. return err
  611. }
  612. pe.putInt32(int32(port))
  613. if version >= 1 {
  614. err = pe.putNullableString(b.rack)
  615. if err != nil {
  616. return err
  617. }
  618. }
  619. return nil
  620. }
  621. func (b *Broker) responseReceiver() {
  622. var dead error
  623. header := make([]byte, 8)
  624. for response := range b.responses {
  625. if dead != nil {
  626. response.errors <- dead
  627. continue
  628. }
  629. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  630. if err != nil {
  631. dead = err
  632. response.errors <- err
  633. continue
  634. }
  635. bytesReadHeader, err := io.ReadFull(b.conn, header)
  636. requestLatency := time.Since(response.requestTime)
  637. if err != nil {
  638. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  639. dead = err
  640. response.errors <- err
  641. continue
  642. }
  643. decodedHeader := responseHeader{}
  644. err = decode(header, &decodedHeader)
  645. if err != nil {
  646. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  647. dead = err
  648. response.errors <- err
  649. continue
  650. }
  651. if decodedHeader.correlationID != response.correlationID {
  652. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  653. // TODO if decoded ID < cur ID, discard until we catch up
  654. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  655. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  656. response.errors <- dead
  657. continue
  658. }
  659. buf := make([]byte, decodedHeader.length-4)
  660. bytesReadBody, err := io.ReadFull(b.conn, buf)
  661. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  662. if err != nil {
  663. dead = err
  664. response.errors <- err
  665. continue
  666. }
  667. response.packets <- buf
  668. }
  669. close(b.done)
  670. }
  671. func (b *Broker) authenticateViaSASL() error {
  672. switch b.conf.Net.SASL.Mechanism {
  673. case SASLTypeOAuth:
  674. return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
  675. case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
  676. return b.sendAndReceiveSASLSCRAMv1()
  677. default:
  678. return b.sendAndReceiveSASLPlainAuth()
  679. }
  680. }
  681. func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
  682. rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
  683. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  684. buf, err := encode(req, b.conf.MetricRegistry)
  685. if err != nil {
  686. return err
  687. }
  688. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  689. if err != nil {
  690. return err
  691. }
  692. requestTime := time.Now()
  693. bytes, err := b.conn.Write(buf)
  694. b.updateOutgoingCommunicationMetrics(bytes)
  695. if err != nil {
  696. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  697. return err
  698. }
  699. b.correlationID++
  700. //wait for the response
  701. header := make([]byte, 8) // response header
  702. _, err = io.ReadFull(b.conn, header)
  703. if err != nil {
  704. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  705. return err
  706. }
  707. length := binary.BigEndian.Uint32(header[:4])
  708. payload := make([]byte, length-4)
  709. n, err := io.ReadFull(b.conn, payload)
  710. if err != nil {
  711. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  712. return err
  713. }
  714. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  715. res := &SaslHandshakeResponse{}
  716. err = versionedDecode(payload, res, 0)
  717. if err != nil {
  718. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  719. return err
  720. }
  721. if res.Err != ErrNoError {
  722. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  723. return res.Err
  724. }
  725. Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
  726. return nil
  727. }
  728. // Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
  729. // Kafka 1.x.x onward added a SaslAuthenticate request/response message which
  730. // wraps the SASL flow in the Kafka protocol, which allows for returning
  731. // meaningful errors on authentication failure.
  732. //
  733. // In SASL Plain, Kafka expects the auth header to be in the following format
  734. // Message format (from https://tools.ietf.org/html/rfc4616):
  735. //
  736. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  737. // authcid = 1*SAFE ; MUST accept up to 255 octets
  738. // authzid = 1*SAFE ; MUST accept up to 255 octets
  739. // passwd = 1*SAFE ; MUST accept up to 255 octets
  740. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  741. //
  742. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  743. // ;; any UTF-8 encoded Unicode character except NUL
  744. //
  745. // With SASL v0 handshake and auth then:
  746. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  747. // When credentials are invalid, Kafka closes the connection.
  748. //
  749. // With SASL v1 handshake and auth then:
  750. // When credentials are invalid, Kafka replies with a SaslAuthenticate response
  751. // containing an error code and message detailing the authentication failure.
  752. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  753. // default to V0 to allow for backward compatability when SASL is enabled
  754. // but not the handshake
  755. saslHandshake := SASLHandshakeV0
  756. if b.conf.Net.SASL.Handshake {
  757. if b.conf.Version.IsAtLeast(V1_0_0_0) {
  758. saslHandshake = SASLHandshakeV1
  759. }
  760. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, saslHandshake)
  761. if handshakeErr != nil {
  762. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  763. return handshakeErr
  764. }
  765. }
  766. if saslHandshake == SASLHandshakeV1 {
  767. return b.sendAndReceiveV1SASLPlainAuth()
  768. }
  769. return b.sendAndReceiveV0SASLPlainAuth()
  770. }
  771. // sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
  772. func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
  773. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  774. authBytes := make([]byte, length+4) //4 byte length header + auth data
  775. binary.BigEndian.PutUint32(authBytes, uint32(length))
  776. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  777. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  778. if err != nil {
  779. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  780. return err
  781. }
  782. requestTime := time.Now()
  783. bytesWritten, err := b.conn.Write(authBytes)
  784. b.updateOutgoingCommunicationMetrics(bytesWritten)
  785. if err != nil {
  786. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  787. return err
  788. }
  789. header := make([]byte, 4)
  790. n, err := io.ReadFull(b.conn, header)
  791. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  792. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  793. // Otherwise, the broker closes the connection and we get an EOF
  794. if err != nil {
  795. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  796. return err
  797. }
  798. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  799. return nil
  800. }
  801. // sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
  802. func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
  803. correlationID := b.correlationID
  804. requestTime := time.Now()
  805. bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
  806. b.updateOutgoingCommunicationMetrics(bytesWritten)
  807. if err != nil {
  808. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  809. return err
  810. }
  811. b.correlationID++
  812. bytesRead, err := b.receiveSASLServerResponse(correlationID)
  813. b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
  814. // With v1 sasl we get an error message set in the response we can return
  815. if err != nil {
  816. Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
  817. return err
  818. }
  819. return nil
  820. }
  821. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  822. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  823. func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
  824. if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
  825. return err
  826. }
  827. token, err := provider.Token()
  828. if err != nil {
  829. return err
  830. }
  831. requestTime := time.Now()
  832. correlationID := b.correlationID
  833. bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
  834. if err != nil {
  835. return err
  836. }
  837. b.updateOutgoingCommunicationMetrics(bytesWritten)
  838. b.correlationID++
  839. bytesRead, err := b.receiveSASLServerResponse(correlationID)
  840. if err != nil {
  841. return err
  842. }
  843. requestLatency := time.Since(requestTime)
  844. b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  845. return nil
  846. }
  847. func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
  848. if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
  849. return err
  850. }
  851. scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
  852. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  853. return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
  854. }
  855. msg, err := scramClient.Step("")
  856. if err != nil {
  857. return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
  858. }
  859. for !scramClient.Done() {
  860. requestTime := time.Now()
  861. correlationID := b.correlationID
  862. bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
  863. if err != nil {
  864. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  865. return err
  866. }
  867. b.updateOutgoingCommunicationMetrics(bytesWritten)
  868. b.correlationID++
  869. challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
  870. if err != nil {
  871. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  872. return err
  873. }
  874. b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
  875. msg, err = scramClient.Step(string(challenge))
  876. if err != nil {
  877. Logger.Println("SASL authentication failed", err)
  878. return err
  879. }
  880. }
  881. Logger.Println("SASL authentication succeeded")
  882. return nil
  883. }
  884. func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
  885. rb := &SaslAuthenticateRequest{msg}
  886. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  887. buf, err := encode(req, b.conf.MetricRegistry)
  888. if err != nil {
  889. return 0, err
  890. }
  891. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  892. return 0, err
  893. }
  894. return b.conn.Write(buf)
  895. }
  896. func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
  897. buf := make([]byte, responseLengthSize+correlationIDSize)
  898. _, err := io.ReadFull(b.conn, buf)
  899. if err != nil {
  900. return nil, err
  901. }
  902. header := responseHeader{}
  903. err = decode(buf, &header)
  904. if err != nil {
  905. return nil, err
  906. }
  907. if header.correlationID != correlationID {
  908. return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  909. }
  910. buf = make([]byte, header.length-correlationIDSize)
  911. _, err = io.ReadFull(b.conn, buf)
  912. if err != nil {
  913. return nil, err
  914. }
  915. res := &SaslAuthenticateResponse{}
  916. if err := versionedDecode(buf, res, 0); err != nil {
  917. return nil, err
  918. }
  919. if res.Err != ErrNoError {
  920. return nil, res.Err
  921. }
  922. return res.SaslAuthBytes, nil
  923. }
  924. // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
  925. // https://tools.ietf.org/html/rfc7628
  926. func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
  927. var ext string
  928. if token.Extensions != nil && len(token.Extensions) > 0 {
  929. if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
  930. return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
  931. }
  932. ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
  933. }
  934. resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
  935. return resp, nil
  936. }
  937. // mapToString returns a list of key-value pairs ordered by key.
  938. // keyValSep separates the key from the value. elemSep separates each pair.
  939. func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
  940. buf := make([]string, 0, len(extensions))
  941. for k, v := range extensions {
  942. buf = append(buf, k+keyValSep+v)
  943. }
  944. sort.Strings(buf)
  945. return strings.Join(buf, elemSep)
  946. }
  947. func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
  948. authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
  949. rb := &SaslAuthenticateRequest{authBytes}
  950. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  951. buf, err := encode(req, b.conf.MetricRegistry)
  952. if err != nil {
  953. return 0, err
  954. }
  955. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  956. if err != nil {
  957. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  958. return 0, err
  959. }
  960. return b.conn.Write(buf)
  961. }
  962. func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
  963. initialResp, err := buildClientInitialResponse(token)
  964. if err != nil {
  965. return 0, err
  966. }
  967. rb := &SaslAuthenticateRequest{initialResp}
  968. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  969. buf, err := encode(req, b.conf.MetricRegistry)
  970. if err != nil {
  971. return 0, err
  972. }
  973. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  974. return 0, err
  975. }
  976. return b.conn.Write(buf)
  977. }
  978. func (b *Broker) receiveSASLServerResponse(correlationID int32) (int, error) {
  979. buf := make([]byte, responseLengthSize+correlationIDSize)
  980. bytesRead, err := io.ReadFull(b.conn, buf)
  981. if err != nil {
  982. return bytesRead, err
  983. }
  984. header := responseHeader{}
  985. err = decode(buf, &header)
  986. if err != nil {
  987. return bytesRead, err
  988. }
  989. if header.correlationID != correlationID {
  990. return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  991. }
  992. buf = make([]byte, header.length-correlationIDSize)
  993. c, err := io.ReadFull(b.conn, buf)
  994. bytesRead += c
  995. if err != nil {
  996. return bytesRead, err
  997. }
  998. res := &SaslAuthenticateResponse{}
  999. if err := versionedDecode(buf, res, 0); err != nil {
  1000. return bytesRead, err
  1001. }
  1002. if res.Err != ErrNoError {
  1003. return bytesRead, res.Err
  1004. }
  1005. if len(res.SaslAuthBytes) > 0 {
  1006. Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
  1007. }
  1008. return bytesRead, nil
  1009. }
  1010. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  1011. b.updateRequestLatencyMetrics(requestLatency)
  1012. b.responseRate.Mark(1)
  1013. if b.brokerResponseRate != nil {
  1014. b.brokerResponseRate.Mark(1)
  1015. }
  1016. responseSize := int64(bytes)
  1017. b.incomingByteRate.Mark(responseSize)
  1018. if b.brokerIncomingByteRate != nil {
  1019. b.brokerIncomingByteRate.Mark(responseSize)
  1020. }
  1021. b.responseSize.Update(responseSize)
  1022. if b.brokerResponseSize != nil {
  1023. b.brokerResponseSize.Update(responseSize)
  1024. }
  1025. }
  1026. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  1027. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  1028. b.requestLatency.Update(requestLatencyInMs)
  1029. if b.brokerRequestLatency != nil {
  1030. b.brokerRequestLatency.Update(requestLatencyInMs)
  1031. }
  1032. }
  1033. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  1034. b.requestRate.Mark(1)
  1035. if b.brokerRequestRate != nil {
  1036. b.brokerRequestRate.Mark(1)
  1037. }
  1038. requestSize := int64(bytes)
  1039. b.outgoingByteRate.Mark(requestSize)
  1040. if b.brokerOutgoingByteRate != nil {
  1041. b.brokerOutgoingByteRate.Mark(requestSize)
  1042. }
  1043. b.requestSize.Update(requestSize)
  1044. if b.brokerRequestSize != nil {
  1045. b.brokerRequestSize.Update(requestSize)
  1046. }
  1047. }
  1048. func (b *Broker) registerMetrics() {
  1049. b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
  1050. b.brokerRequestRate = b.registerMeter("request-rate")
  1051. b.brokerRequestSize = b.registerHistogram("request-size")
  1052. b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
  1053. b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
  1054. b.brokerResponseRate = b.registerMeter("response-rate")
  1055. b.brokerResponseSize = b.registerHistogram("response-size")
  1056. }
  1057. func (b *Broker) unregisterMetrics() {
  1058. for _, name := range b.registeredMetrics {
  1059. b.conf.MetricRegistry.Unregister(name)
  1060. }
  1061. }
  1062. func (b *Broker) registerMeter(name string) metrics.Meter {
  1063. nameForBroker := getMetricNameForBroker(name, b)
  1064. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1065. return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
  1066. }
  1067. func (b *Broker) registerHistogram(name string) metrics.Histogram {
  1068. nameForBroker := getMetricNameForBroker(name, b)
  1069. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1070. return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
  1071. }