broker.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442
  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sort"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. metrics "github.com/rcrowley/go-metrics"
  15. )
  16. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  17. type Broker struct {
  18. conf *Config
  19. rack *string
  20. id int32
  21. addr string
  22. correlationID int32
  23. conn net.Conn
  24. connErr error
  25. lock sync.Mutex
  26. opened int32
  27. responses chan responsePromise
  28. done chan bool
  29. registeredMetrics []string
  30. incomingByteRate metrics.Meter
  31. requestRate metrics.Meter
  32. requestSize metrics.Histogram
  33. requestLatency metrics.Histogram
  34. outgoingByteRate metrics.Meter
  35. responseRate metrics.Meter
  36. responseSize metrics.Histogram
  37. requestsInFlight metrics.Counter
  38. brokerIncomingByteRate metrics.Meter
  39. brokerRequestRate metrics.Meter
  40. brokerRequestSize metrics.Histogram
  41. brokerRequestLatency metrics.Histogram
  42. brokerOutgoingByteRate metrics.Meter
  43. brokerResponseRate metrics.Meter
  44. brokerResponseSize metrics.Histogram
  45. brokerRequestsInFlight metrics.Counter
  46. kerberosAuthenticator GSSAPIKerberosAuth
  47. }
  48. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  49. type SASLMechanism string
  50. const (
  51. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  52. SASLTypeOAuth = "OAUTHBEARER"
  53. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  54. SASLTypePlaintext = "PLAIN"
  55. // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
  56. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
  57. // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
  58. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
  59. SASLTypeGSSAPI = "GSSAPI"
  60. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  61. // server negotiate SASL auth using opaque packets.
  62. SASLHandshakeV0 = int16(0)
  63. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  64. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  65. SASLHandshakeV1 = int16(1)
  66. // SASLExtKeyAuth is the reserved extension key name sent as part of the
  67. // SASL/OAUTHBEARER initial client response
  68. SASLExtKeyAuth = "auth"
  69. )
  70. // AccessToken contains an access token used to authenticate a
  71. // SASL/OAUTHBEARER client along with associated metadata.
  72. type AccessToken struct {
  73. // Token is the access token payload.
  74. Token string
  75. // Extensions is a optional map of arbitrary key-value pairs that can be
  76. // sent with the SASL/OAUTHBEARER initial client response. These values are
  77. // ignored by the SASL server if they are unexpected. This feature is only
  78. // supported by Kafka >= 2.1.0.
  79. Extensions map[string]string
  80. }
  81. // AccessTokenProvider is the interface that encapsulates how implementors
  82. // can generate access tokens for Kafka broker authentication.
  83. type AccessTokenProvider interface {
  84. // Token returns an access token. The implementation should ensure token
  85. // reuse so that multiple calls at connect time do not create multiple
  86. // tokens. The implementation should also periodically refresh the token in
  87. // order to guarantee that each call returns an unexpired token. This
  88. // method should not block indefinitely--a timeout error should be returned
  89. // after a short period of inactivity so that the broker connection logic
  90. // can log debugging information and retry.
  91. Token() (*AccessToken, error)
  92. }
  93. // SCRAMClient is a an interface to a SCRAM
  94. // client implementation.
  95. type SCRAMClient interface {
  96. // Begin prepares the client for the SCRAM exchange
  97. // with the server with a user name and a password
  98. Begin(userName, password, authzID string) error
  99. // Step steps client through the SCRAM exchange. It is
  100. // called repeatedly until it errors or `Done` returns true.
  101. Step(challenge string) (response string, err error)
  102. // Done should return true when the SCRAM conversation
  103. // is over.
  104. Done() bool
  105. }
  106. type responsePromise struct {
  107. requestTime time.Time
  108. correlationID int32
  109. headerVersion int16
  110. packets chan []byte
  111. errors chan error
  112. }
  113. // NewBroker creates and returns a Broker targeting the given host:port address.
  114. // This does not attempt to actually connect, you have to call Open() for that.
  115. func NewBroker(addr string) *Broker {
  116. return &Broker{id: -1, addr: addr}
  117. }
  118. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  119. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  120. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  121. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  122. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  123. func (b *Broker) Open(conf *Config) error {
  124. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  125. return ErrAlreadyConnected
  126. }
  127. if conf == nil {
  128. conf = NewConfig()
  129. }
  130. err := conf.Validate()
  131. if err != nil {
  132. return err
  133. }
  134. b.lock.Lock()
  135. go withRecover(func() {
  136. defer b.lock.Unlock()
  137. dialer := conf.getDialer()
  138. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  139. if b.connErr != nil {
  140. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  141. b.conn = nil
  142. atomic.StoreInt32(&b.opened, 0)
  143. return
  144. }
  145. if conf.Net.TLS.Enable {
  146. Logger.Printf("Using tls")
  147. cfg := conf.Net.TLS.Config
  148. if cfg == nil {
  149. cfg = &tls.Config{}
  150. }
  151. // If no ServerName is set, infer the ServerName
  152. // from the hostname we're connecting to.
  153. // Gets the hostname as tls.DialWithDialer does it.
  154. if cfg.ServerName == "" {
  155. colonPos := strings.LastIndex(b.addr, ":")
  156. if colonPos == -1 {
  157. colonPos = len(b.addr)
  158. }
  159. hostname := b.addr[:colonPos]
  160. cfg.ServerName = hostname
  161. }
  162. b.conn = tls.Client(b.conn, cfg)
  163. }
  164. b.conn = newBufConn(b.conn)
  165. b.conf = conf
  166. // Create or reuse the global metrics shared between brokers
  167. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  168. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  169. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  170. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  171. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  172. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  173. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  174. b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
  175. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  176. // the same id (-1) and are already exposed through the global metrics above
  177. if b.id >= 0 {
  178. b.registerMetrics()
  179. }
  180. if conf.Net.SASL.Enable {
  181. b.connErr = b.authenticateViaSASL()
  182. if b.connErr != nil {
  183. err = b.conn.Close()
  184. if err == nil {
  185. Logger.Printf("Closed connection to broker %s\n", b.addr)
  186. } else {
  187. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  188. }
  189. b.conn = nil
  190. atomic.StoreInt32(&b.opened, 0)
  191. return
  192. }
  193. }
  194. b.done = make(chan bool)
  195. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  196. if b.id >= 0 {
  197. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  198. } else {
  199. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  200. }
  201. go withRecover(b.responseReceiver)
  202. })
  203. return nil
  204. }
  205. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  206. // connected but it had tried to connect, the error from that connection attempt is also returned.
  207. func (b *Broker) Connected() (bool, error) {
  208. b.lock.Lock()
  209. defer b.lock.Unlock()
  210. return b.conn != nil, b.connErr
  211. }
  212. //Close closes the broker resources
  213. func (b *Broker) Close() error {
  214. b.lock.Lock()
  215. defer b.lock.Unlock()
  216. if b.conn == nil {
  217. return ErrNotConnected
  218. }
  219. close(b.responses)
  220. <-b.done
  221. err := b.conn.Close()
  222. b.conn = nil
  223. b.connErr = nil
  224. b.done = nil
  225. b.responses = nil
  226. b.unregisterMetrics()
  227. if err == nil {
  228. Logger.Printf("Closed connection to broker %s\n", b.addr)
  229. } else {
  230. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  231. }
  232. atomic.StoreInt32(&b.opened, 0)
  233. return err
  234. }
  235. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  236. func (b *Broker) ID() int32 {
  237. return b.id
  238. }
  239. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  240. func (b *Broker) Addr() string {
  241. return b.addr
  242. }
  243. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  244. // empty string if it is not known. The returned value corresponds to the
  245. // broker's broker.rack configuration setting. Requires protocol version to be
  246. // at least v0.10.0.0.
  247. func (b *Broker) Rack() string {
  248. if b.rack == nil {
  249. return ""
  250. }
  251. return *b.rack
  252. }
  253. //GetMetadata send a metadata request and returns a metadata response or error
  254. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  255. response := new(MetadataResponse)
  256. err := b.sendAndReceive(request, response)
  257. if err != nil {
  258. return nil, err
  259. }
  260. return response, nil
  261. }
  262. //GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
  263. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  264. response := new(ConsumerMetadataResponse)
  265. err := b.sendAndReceive(request, response)
  266. if err != nil {
  267. return nil, err
  268. }
  269. return response, nil
  270. }
  271. //FindCoordinator sends a find coordinate request and returns a response or error
  272. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  273. response := new(FindCoordinatorResponse)
  274. err := b.sendAndReceive(request, response)
  275. if err != nil {
  276. return nil, err
  277. }
  278. return response, nil
  279. }
  280. //GetAvailableOffsets return an offset response or error
  281. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  282. response := new(OffsetResponse)
  283. err := b.sendAndReceive(request, response)
  284. if err != nil {
  285. return nil, err
  286. }
  287. return response, nil
  288. }
  289. //Produce returns a produce response or error
  290. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  291. var (
  292. response *ProduceResponse
  293. err error
  294. )
  295. if request.RequiredAcks == NoResponse {
  296. err = b.sendAndReceive(request, nil)
  297. } else {
  298. response = new(ProduceResponse)
  299. err = b.sendAndReceive(request, response)
  300. }
  301. if err != nil {
  302. return nil, err
  303. }
  304. return response, nil
  305. }
  306. //Fetch returns a FetchResponse or error
  307. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  308. response := new(FetchResponse)
  309. err := b.sendAndReceive(request, response)
  310. if err != nil {
  311. return nil, err
  312. }
  313. return response, nil
  314. }
  315. //CommitOffset return an Offset commit response or error
  316. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  317. response := new(OffsetCommitResponse)
  318. err := b.sendAndReceive(request, response)
  319. if err != nil {
  320. return nil, err
  321. }
  322. return response, nil
  323. }
  324. //FetchOffset returns an offset fetch response or error
  325. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  326. response := new(OffsetFetchResponse)
  327. err := b.sendAndReceive(request, response)
  328. if err != nil {
  329. return nil, err
  330. }
  331. return response, nil
  332. }
  333. //JoinGroup returns a join group response or error
  334. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  335. response := new(JoinGroupResponse)
  336. err := b.sendAndReceive(request, response)
  337. if err != nil {
  338. return nil, err
  339. }
  340. return response, nil
  341. }
  342. //SyncGroup returns a sync group response or error
  343. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  344. response := new(SyncGroupResponse)
  345. err := b.sendAndReceive(request, response)
  346. if err != nil {
  347. return nil, err
  348. }
  349. return response, nil
  350. }
  351. //LeaveGroup return a leave group response or error
  352. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  353. response := new(LeaveGroupResponse)
  354. err := b.sendAndReceive(request, response)
  355. if err != nil {
  356. return nil, err
  357. }
  358. return response, nil
  359. }
  360. //Heartbeat returns a heartbeat response or error
  361. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  362. response := new(HeartbeatResponse)
  363. err := b.sendAndReceive(request, response)
  364. if err != nil {
  365. return nil, err
  366. }
  367. return response, nil
  368. }
  369. //ListGroups return a list group response or error
  370. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  371. response := new(ListGroupsResponse)
  372. err := b.sendAndReceive(request, response)
  373. if err != nil {
  374. return nil, err
  375. }
  376. return response, nil
  377. }
  378. //DescribeGroups return describe group response or error
  379. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  380. response := new(DescribeGroupsResponse)
  381. err := b.sendAndReceive(request, response)
  382. if err != nil {
  383. return nil, err
  384. }
  385. return response, nil
  386. }
  387. //ApiVersions return api version response or error
  388. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  389. response := new(ApiVersionsResponse)
  390. err := b.sendAndReceive(request, response)
  391. if err != nil {
  392. return nil, err
  393. }
  394. return response, nil
  395. }
  396. //CreateTopics send a create topic request and returns create topic response
  397. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  398. response := new(CreateTopicsResponse)
  399. err := b.sendAndReceive(request, response)
  400. if err != nil {
  401. return nil, err
  402. }
  403. return response, nil
  404. }
  405. //DeleteTopics sends a delete topic request and returns delete topic response
  406. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  407. response := new(DeleteTopicsResponse)
  408. err := b.sendAndReceive(request, response)
  409. if err != nil {
  410. return nil, err
  411. }
  412. return response, nil
  413. }
  414. //CreatePartitions sends a create partition request and returns create
  415. //partitions response or error
  416. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  417. response := new(CreatePartitionsResponse)
  418. err := b.sendAndReceive(request, response)
  419. if err != nil {
  420. return nil, err
  421. }
  422. return response, nil
  423. }
  424. //AlterPartitionReassignments sends a alter partition reassignments request and
  425. //returns alter partition reassignments response
  426. func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
  427. response := new(AlterPartitionReassignmentsResponse)
  428. err := b.sendAndReceive(request, response)
  429. if err != nil {
  430. return nil, err
  431. }
  432. return response, nil
  433. }
  434. //ListPartitionReassignments sends a list partition reassignments request and
  435. //returns list partition reassignments response
  436. func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
  437. response := new(ListPartitionReassignmentsResponse)
  438. err := b.sendAndReceive(request, response)
  439. if err != nil {
  440. return nil, err
  441. }
  442. return response, nil
  443. }
  444. //DeleteRecords send a request to delete records and return delete record
  445. //response or error
  446. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  447. response := new(DeleteRecordsResponse)
  448. err := b.sendAndReceive(request, response)
  449. if err != nil {
  450. return nil, err
  451. }
  452. return response, nil
  453. }
  454. //DescribeAcls sends a describe acl request and returns a response or error
  455. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  456. response := new(DescribeAclsResponse)
  457. err := b.sendAndReceive(request, response)
  458. if err != nil {
  459. return nil, err
  460. }
  461. return response, nil
  462. }
  463. //CreateAcls sends a create acl request and returns a response or error
  464. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  465. response := new(CreateAclsResponse)
  466. err := b.sendAndReceive(request, response)
  467. if err != nil {
  468. return nil, err
  469. }
  470. return response, nil
  471. }
  472. //DeleteAcls sends a delete acl request and returns a response or error
  473. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  474. response := new(DeleteAclsResponse)
  475. err := b.sendAndReceive(request, response)
  476. if err != nil {
  477. return nil, err
  478. }
  479. return response, nil
  480. }
  481. //InitProducerID sends an init producer request and returns a response or error
  482. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  483. response := new(InitProducerIDResponse)
  484. err := b.sendAndReceive(request, response)
  485. if err != nil {
  486. return nil, err
  487. }
  488. return response, nil
  489. }
  490. //AddPartitionsToTxn send a request to add partition to txn and returns
  491. //a response or error
  492. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  493. response := new(AddPartitionsToTxnResponse)
  494. err := b.sendAndReceive(request, response)
  495. if err != nil {
  496. return nil, err
  497. }
  498. return response, nil
  499. }
  500. //AddOffsetsToTxn sends a request to add offsets to txn and returns a response
  501. //or error
  502. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  503. response := new(AddOffsetsToTxnResponse)
  504. err := b.sendAndReceive(request, response)
  505. if err != nil {
  506. return nil, err
  507. }
  508. return response, nil
  509. }
  510. //EndTxn sends a request to end txn and returns a response or error
  511. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  512. response := new(EndTxnResponse)
  513. err := b.sendAndReceive(request, response)
  514. if err != nil {
  515. return nil, err
  516. }
  517. return response, nil
  518. }
  519. //TxnOffsetCommit sends a request to commit transaction offsets and returns
  520. //a response or error
  521. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  522. response := new(TxnOffsetCommitResponse)
  523. err := b.sendAndReceive(request, response)
  524. if err != nil {
  525. return nil, err
  526. }
  527. return response, nil
  528. }
  529. //DescribeConfigs sends a request to describe config and returns a response or
  530. //error
  531. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  532. response := new(DescribeConfigsResponse)
  533. err := b.sendAndReceive(request, response)
  534. if err != nil {
  535. return nil, err
  536. }
  537. return response, nil
  538. }
  539. //AlterConfigs sends a request to alter config and return a response or error
  540. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  541. response := new(AlterConfigsResponse)
  542. err := b.sendAndReceive(request, response)
  543. if err != nil {
  544. return nil, err
  545. }
  546. return response, nil
  547. }
  548. //DeleteGroups sends a request to delete groups and returns a response or error
  549. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  550. response := new(DeleteGroupsResponse)
  551. if err := b.sendAndReceive(request, response); err != nil {
  552. return nil, err
  553. }
  554. return response, nil
  555. }
  556. //DescribeLogDirs sends a request to get the broker's log dir paths and sizes
  557. func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
  558. response := new(DescribeLogDirsResponse)
  559. err := b.sendAndReceive(request, response)
  560. if err != nil {
  561. return nil, err
  562. }
  563. return response, nil
  564. }
  565. // readFull ensures the conn ReadDeadline has been setup before making a
  566. // call to io.ReadFull
  567. func (b *Broker) readFull(buf []byte) (n int, err error) {
  568. if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
  569. return 0, err
  570. }
  571. return io.ReadFull(b.conn, buf)
  572. }
  573. // write ensures the conn WriteDeadline has been setup before making a
  574. // call to conn.Write
  575. func (b *Broker) write(buf []byte) (n int, err error) {
  576. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  577. return 0, err
  578. }
  579. return b.conn.Write(buf)
  580. }
  581. func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
  582. b.lock.Lock()
  583. defer b.lock.Unlock()
  584. if b.conn == nil {
  585. if b.connErr != nil {
  586. return nil, b.connErr
  587. }
  588. return nil, ErrNotConnected
  589. }
  590. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  591. return nil, ErrUnsupportedVersion
  592. }
  593. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  594. buf, err := encode(req, b.conf.MetricRegistry)
  595. if err != nil {
  596. return nil, err
  597. }
  598. requestTime := time.Now()
  599. // Will be decremented in responseReceiver (except error or request with NoResponse)
  600. b.addRequestInFlightMetrics(1)
  601. bytes, err := b.write(buf)
  602. b.updateOutgoingCommunicationMetrics(bytes)
  603. if err != nil {
  604. b.addRequestInFlightMetrics(-1)
  605. return nil, err
  606. }
  607. b.correlationID++
  608. if !promiseResponse {
  609. // Record request latency without the response
  610. b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
  611. return nil, nil
  612. }
  613. promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
  614. b.responses <- promise
  615. return &promise, nil
  616. }
  617. func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
  618. responseHeaderVersion := int16(-1)
  619. if res != nil {
  620. responseHeaderVersion = res.headerVersion()
  621. }
  622. promise, err := b.send(req, res != nil, responseHeaderVersion)
  623. if err != nil {
  624. return err
  625. }
  626. if promise == nil {
  627. return nil
  628. }
  629. select {
  630. case buf := <-promise.packets:
  631. return versionedDecode(buf, res, req.version())
  632. case err = <-promise.errors:
  633. return err
  634. }
  635. }
  636. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  637. b.id, err = pd.getInt32()
  638. if err != nil {
  639. return err
  640. }
  641. host, err := pd.getString()
  642. if err != nil {
  643. return err
  644. }
  645. port, err := pd.getInt32()
  646. if err != nil {
  647. return err
  648. }
  649. if version >= 1 {
  650. b.rack, err = pd.getNullableString()
  651. if err != nil {
  652. return err
  653. }
  654. }
  655. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  656. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  657. return err
  658. }
  659. return nil
  660. }
  661. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  662. host, portstr, err := net.SplitHostPort(b.addr)
  663. if err != nil {
  664. return err
  665. }
  666. port, err := strconv.Atoi(portstr)
  667. if err != nil {
  668. return err
  669. }
  670. pe.putInt32(b.id)
  671. err = pe.putString(host)
  672. if err != nil {
  673. return err
  674. }
  675. pe.putInt32(int32(port))
  676. if version >= 1 {
  677. err = pe.putNullableString(b.rack)
  678. if err != nil {
  679. return err
  680. }
  681. }
  682. return nil
  683. }
  684. func (b *Broker) responseReceiver() {
  685. var dead error
  686. for response := range b.responses {
  687. if dead != nil {
  688. // This was previously incremented in send() and
  689. // we are not calling updateIncomingCommunicationMetrics()
  690. b.addRequestInFlightMetrics(-1)
  691. response.errors <- dead
  692. continue
  693. }
  694. var headerLength = getHeaderLength(response.headerVersion)
  695. header := make([]byte, headerLength)
  696. bytesReadHeader, err := b.readFull(header)
  697. requestLatency := time.Since(response.requestTime)
  698. if err != nil {
  699. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  700. dead = err
  701. response.errors <- err
  702. continue
  703. }
  704. decodedHeader := responseHeader{}
  705. err = versionedDecode(header, &decodedHeader, response.headerVersion)
  706. if err != nil {
  707. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  708. dead = err
  709. response.errors <- err
  710. continue
  711. }
  712. if decodedHeader.correlationID != response.correlationID {
  713. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  714. // TODO if decoded ID < cur ID, discard until we catch up
  715. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  716. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  717. response.errors <- dead
  718. continue
  719. }
  720. buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
  721. bytesReadBody, err := b.readFull(buf)
  722. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  723. if err != nil {
  724. dead = err
  725. response.errors <- err
  726. continue
  727. }
  728. response.packets <- buf
  729. }
  730. close(b.done)
  731. }
  732. func getHeaderLength(headerVersion int16) int8 {
  733. if headerVersion < 1 {
  734. return 8
  735. } else {
  736. // header contains additional tagged field length (0), we don't support actual tags yet.
  737. return 9
  738. }
  739. }
  740. func (b *Broker) authenticateViaSASL() error {
  741. switch b.conf.Net.SASL.Mechanism {
  742. case SASLTypeOAuth:
  743. return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
  744. case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
  745. return b.sendAndReceiveSASLSCRAMv1()
  746. case SASLTypeGSSAPI:
  747. return b.sendAndReceiveKerberos()
  748. default:
  749. return b.sendAndReceiveSASLPlainAuth()
  750. }
  751. }
  752. func (b *Broker) sendAndReceiveKerberos() error {
  753. b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
  754. if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
  755. b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
  756. }
  757. return b.kerberosAuthenticator.Authorize(b)
  758. }
  759. func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
  760. rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
  761. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  762. buf, err := encode(req, b.conf.MetricRegistry)
  763. if err != nil {
  764. return err
  765. }
  766. requestTime := time.Now()
  767. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  768. b.addRequestInFlightMetrics(1)
  769. bytes, err := b.write(buf)
  770. b.updateOutgoingCommunicationMetrics(bytes)
  771. if err != nil {
  772. b.addRequestInFlightMetrics(-1)
  773. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  774. return err
  775. }
  776. b.correlationID++
  777. header := make([]byte, 8) // response header
  778. _, err = b.readFull(header)
  779. if err != nil {
  780. b.addRequestInFlightMetrics(-1)
  781. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  782. return err
  783. }
  784. length := binary.BigEndian.Uint32(header[:4])
  785. payload := make([]byte, length-4)
  786. n, err := b.readFull(payload)
  787. if err != nil {
  788. b.addRequestInFlightMetrics(-1)
  789. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  790. return err
  791. }
  792. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  793. res := &SaslHandshakeResponse{}
  794. err = versionedDecode(payload, res, 0)
  795. if err != nil {
  796. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  797. return err
  798. }
  799. if res.Err != ErrNoError {
  800. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  801. return res.Err
  802. }
  803. Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
  804. return nil
  805. }
  806. // Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
  807. // Kafka 1.x.x onward added a SaslAuthenticate request/response message which
  808. // wraps the SASL flow in the Kafka protocol, which allows for returning
  809. // meaningful errors on authentication failure.
  810. //
  811. // In SASL Plain, Kafka expects the auth header to be in the following format
  812. // Message format (from https://tools.ietf.org/html/rfc4616):
  813. //
  814. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  815. // authcid = 1*SAFE ; MUST accept up to 255 octets
  816. // authzid = 1*SAFE ; MUST accept up to 255 octets
  817. // passwd = 1*SAFE ; MUST accept up to 255 octets
  818. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  819. //
  820. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  821. // ;; any UTF-8 encoded Unicode character except NUL
  822. //
  823. // With SASL v0 handshake and auth then:
  824. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  825. // When credentials are invalid, Kafka closes the connection.
  826. //
  827. // With SASL v1 handshake and auth then:
  828. // When credentials are invalid, Kafka replies with a SaslAuthenticate response
  829. // containing an error code and message detailing the authentication failure.
  830. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  831. // default to V0 to allow for backward compatibility when SASL is enabled
  832. // but not the handshake
  833. if b.conf.Net.SASL.Handshake {
  834. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
  835. if handshakeErr != nil {
  836. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  837. return handshakeErr
  838. }
  839. }
  840. if b.conf.Net.SASL.Version == SASLHandshakeV1 {
  841. return b.sendAndReceiveV1SASLPlainAuth()
  842. }
  843. return b.sendAndReceiveV0SASLPlainAuth()
  844. }
  845. // sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
  846. func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
  847. length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  848. authBytes := make([]byte, length+4) //4 byte length header + auth data
  849. binary.BigEndian.PutUint32(authBytes, uint32(length))
  850. copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  851. requestTime := time.Now()
  852. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  853. b.addRequestInFlightMetrics(1)
  854. bytesWritten, err := b.write(authBytes)
  855. b.updateOutgoingCommunicationMetrics(bytesWritten)
  856. if err != nil {
  857. b.addRequestInFlightMetrics(-1)
  858. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  859. return err
  860. }
  861. header := make([]byte, 4)
  862. n, err := b.readFull(header)
  863. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  864. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  865. // Otherwise, the broker closes the connection and we get an EOF
  866. if err != nil {
  867. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  868. return err
  869. }
  870. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  871. return nil
  872. }
  873. // sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
  874. func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
  875. correlationID := b.correlationID
  876. requestTime := time.Now()
  877. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  878. b.addRequestInFlightMetrics(1)
  879. bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
  880. b.updateOutgoingCommunicationMetrics(bytesWritten)
  881. if err != nil {
  882. b.addRequestInFlightMetrics(-1)
  883. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  884. return err
  885. }
  886. b.correlationID++
  887. bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
  888. b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
  889. // With v1 sasl we get an error message set in the response we can return
  890. if err != nil {
  891. Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
  892. return err
  893. }
  894. return nil
  895. }
  896. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  897. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  898. func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
  899. if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
  900. return err
  901. }
  902. token, err := provider.Token()
  903. if err != nil {
  904. return err
  905. }
  906. message, err := buildClientFirstMessage(token)
  907. if err != nil {
  908. return err
  909. }
  910. challenged, err := b.sendClientMessage(message)
  911. if err != nil {
  912. return err
  913. }
  914. if challenged {
  915. // Abort the token exchange. The broker returns the failure code.
  916. _, err = b.sendClientMessage([]byte(`\x01`))
  917. }
  918. return err
  919. }
  920. // sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
  921. // if the broker responds with a challenge, in which case the token is
  922. // rejected.
  923. func (b *Broker) sendClientMessage(message []byte) (bool, error) {
  924. requestTime := time.Now()
  925. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  926. b.addRequestInFlightMetrics(1)
  927. correlationID := b.correlationID
  928. bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
  929. b.updateOutgoingCommunicationMetrics(bytesWritten)
  930. if err != nil {
  931. b.addRequestInFlightMetrics(-1)
  932. return false, err
  933. }
  934. b.correlationID++
  935. res := &SaslAuthenticateResponse{}
  936. bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
  937. requestLatency := time.Since(requestTime)
  938. b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  939. isChallenge := len(res.SaslAuthBytes) > 0
  940. if isChallenge && err != nil {
  941. Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
  942. }
  943. return isChallenge, err
  944. }
  945. func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
  946. if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
  947. return err
  948. }
  949. scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
  950. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  951. return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
  952. }
  953. msg, err := scramClient.Step("")
  954. if err != nil {
  955. return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
  956. }
  957. for !scramClient.Done() {
  958. requestTime := time.Now()
  959. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  960. b.addRequestInFlightMetrics(1)
  961. correlationID := b.correlationID
  962. bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
  963. b.updateOutgoingCommunicationMetrics(bytesWritten)
  964. if err != nil {
  965. b.addRequestInFlightMetrics(-1)
  966. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  967. return err
  968. }
  969. b.correlationID++
  970. challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
  971. if err != nil {
  972. b.addRequestInFlightMetrics(-1)
  973. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  974. return err
  975. }
  976. b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
  977. msg, err = scramClient.Step(string(challenge))
  978. if err != nil {
  979. Logger.Println("SASL authentication failed", err)
  980. return err
  981. }
  982. }
  983. Logger.Println("SASL authentication succeeded")
  984. return nil
  985. }
  986. func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
  987. rb := &SaslAuthenticateRequest{msg}
  988. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  989. buf, err := encode(req, b.conf.MetricRegistry)
  990. if err != nil {
  991. return 0, err
  992. }
  993. return b.write(buf)
  994. }
  995. func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
  996. buf := make([]byte, responseLengthSize+correlationIDSize)
  997. _, err := b.readFull(buf)
  998. if err != nil {
  999. return nil, err
  1000. }
  1001. header := responseHeader{}
  1002. err = versionedDecode(buf, &header, 0)
  1003. if err != nil {
  1004. return nil, err
  1005. }
  1006. if header.correlationID != correlationID {
  1007. return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  1008. }
  1009. buf = make([]byte, header.length-correlationIDSize)
  1010. _, err = b.readFull(buf)
  1011. if err != nil {
  1012. return nil, err
  1013. }
  1014. res := &SaslAuthenticateResponse{}
  1015. if err := versionedDecode(buf, res, 0); err != nil {
  1016. return nil, err
  1017. }
  1018. if res.Err != ErrNoError {
  1019. return nil, res.Err
  1020. }
  1021. return res.SaslAuthBytes, nil
  1022. }
  1023. // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
  1024. // https://tools.ietf.org/html/rfc7628
  1025. func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
  1026. var ext string
  1027. if token.Extensions != nil && len(token.Extensions) > 0 {
  1028. if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
  1029. return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
  1030. }
  1031. ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
  1032. }
  1033. resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
  1034. return resp, nil
  1035. }
  1036. // mapToString returns a list of key-value pairs ordered by key.
  1037. // keyValSep separates the key from the value. elemSep separates each pair.
  1038. func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
  1039. buf := make([]string, 0, len(extensions))
  1040. for k, v := range extensions {
  1041. buf = append(buf, k+keyValSep+v)
  1042. }
  1043. sort.Strings(buf)
  1044. return strings.Join(buf, elemSep)
  1045. }
  1046. func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
  1047. authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
  1048. rb := &SaslAuthenticateRequest{authBytes}
  1049. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  1050. buf, err := encode(req, b.conf.MetricRegistry)
  1051. if err != nil {
  1052. return 0, err
  1053. }
  1054. return b.write(buf)
  1055. }
  1056. func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
  1057. rb := &SaslAuthenticateRequest{initialResp}
  1058. req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
  1059. buf, err := encode(req, b.conf.MetricRegistry)
  1060. if err != nil {
  1061. return 0, err
  1062. }
  1063. return b.write(buf)
  1064. }
  1065. func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
  1066. buf := make([]byte, responseLengthSize+correlationIDSize)
  1067. bytesRead, err := b.readFull(buf)
  1068. if err != nil {
  1069. return bytesRead, err
  1070. }
  1071. header := responseHeader{}
  1072. err = versionedDecode(buf, &header, 0)
  1073. if err != nil {
  1074. return bytesRead, err
  1075. }
  1076. if header.correlationID != correlationID {
  1077. return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
  1078. }
  1079. buf = make([]byte, header.length-correlationIDSize)
  1080. c, err := b.readFull(buf)
  1081. bytesRead += c
  1082. if err != nil {
  1083. return bytesRead, err
  1084. }
  1085. if err := versionedDecode(buf, res, 0); err != nil {
  1086. return bytesRead, err
  1087. }
  1088. if res.Err != ErrNoError {
  1089. return bytesRead, res.Err
  1090. }
  1091. return bytesRead, nil
  1092. }
  1093. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  1094. b.updateRequestLatencyAndInFlightMetrics(requestLatency)
  1095. b.responseRate.Mark(1)
  1096. if b.brokerResponseRate != nil {
  1097. b.brokerResponseRate.Mark(1)
  1098. }
  1099. responseSize := int64(bytes)
  1100. b.incomingByteRate.Mark(responseSize)
  1101. if b.brokerIncomingByteRate != nil {
  1102. b.brokerIncomingByteRate.Mark(responseSize)
  1103. }
  1104. b.responseSize.Update(responseSize)
  1105. if b.brokerResponseSize != nil {
  1106. b.brokerResponseSize.Update(responseSize)
  1107. }
  1108. }
  1109. func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
  1110. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  1111. b.requestLatency.Update(requestLatencyInMs)
  1112. if b.brokerRequestLatency != nil {
  1113. b.brokerRequestLatency.Update(requestLatencyInMs)
  1114. }
  1115. b.addRequestInFlightMetrics(-1)
  1116. }
  1117. func (b *Broker) addRequestInFlightMetrics(i int64) {
  1118. b.requestsInFlight.Inc(i)
  1119. if b.brokerRequestsInFlight != nil {
  1120. b.brokerRequestsInFlight.Inc(i)
  1121. }
  1122. }
  1123. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  1124. b.requestRate.Mark(1)
  1125. if b.brokerRequestRate != nil {
  1126. b.brokerRequestRate.Mark(1)
  1127. }
  1128. requestSize := int64(bytes)
  1129. b.outgoingByteRate.Mark(requestSize)
  1130. if b.brokerOutgoingByteRate != nil {
  1131. b.brokerOutgoingByteRate.Mark(requestSize)
  1132. }
  1133. b.requestSize.Update(requestSize)
  1134. if b.brokerRequestSize != nil {
  1135. b.brokerRequestSize.Update(requestSize)
  1136. }
  1137. }
  1138. func (b *Broker) registerMetrics() {
  1139. b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
  1140. b.brokerRequestRate = b.registerMeter("request-rate")
  1141. b.brokerRequestSize = b.registerHistogram("request-size")
  1142. b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
  1143. b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
  1144. b.brokerResponseRate = b.registerMeter("response-rate")
  1145. b.brokerResponseSize = b.registerHistogram("response-size")
  1146. b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
  1147. }
  1148. func (b *Broker) unregisterMetrics() {
  1149. for _, name := range b.registeredMetrics {
  1150. b.conf.MetricRegistry.Unregister(name)
  1151. }
  1152. b.registeredMetrics = nil
  1153. }
  1154. func (b *Broker) registerMeter(name string) metrics.Meter {
  1155. nameForBroker := getMetricNameForBroker(name, b)
  1156. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1157. return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
  1158. }
  1159. func (b *Broker) registerHistogram(name string) metrics.Histogram {
  1160. nameForBroker := getMetricNameForBroker(name, b)
  1161. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1162. return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
  1163. }
  1164. func (b *Broker) registerCounter(name string) metrics.Counter {
  1165. nameForBroker := getMetricNameForBroker(name, b)
  1166. b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
  1167. return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
  1168. }