broker.go 18 KB


  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/rcrowley/go-metrics"
  13. )
  14. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  15. type Broker struct {
  16. id int32
  17. addr string
  18. conf *Config
  19. correlationID int32
  20. conn net.Conn
  21. connErr error
  22. lock sync.Mutex
  23. opened int32
  24. responses chan responsePromise
  25. done chan bool
  26. incomingByteRate metrics.Meter
  27. requestRate metrics.Meter
  28. requestSize metrics.Histogram
  29. requestLatency metrics.Histogram
  30. outgoingByteRate metrics.Meter
  31. responseRate metrics.Meter
  32. responseSize metrics.Histogram
  33. brokerIncomingByteRate metrics.Meter
  34. brokerRequestRate metrics.Meter
  35. brokerRequestSize metrics.Histogram
  36. brokerRequestLatency metrics.Histogram
  37. brokerOutgoingByteRate metrics.Meter
  38. brokerResponseRate metrics.Meter
  39. brokerResponseSize metrics.Histogram
  40. }
  41. type responsePromise struct {
  42. requestTime time.Time
  43. correlationID int32
  44. packets chan []byte
  45. errors chan error
  46. }
  47. // NewBroker creates and returns a Broker targeting the given host:port address.
  48. // This does not attempt to actually connect, you have to call Open() for that.
  49. func NewBroker(addr string) *Broker {
  50. return &Broker{id: -1, addr: addr}
  51. }
  52. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  53. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  54. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  55. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  56. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  57. func (b *Broker) Open(conf *Config) error {
  58. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  59. return ErrAlreadyConnected
  60. }
  61. if conf == nil {
  62. conf = NewConfig()
  63. }
  64. err := conf.Validate()
  65. if err != nil {
  66. return err
  67. }
  68. b.lock.Lock()
  69. go withRecover(func() {
  70. defer b.lock.Unlock()
  71. dialer := net.Dialer{
  72. Timeout: conf.Net.DialTimeout,
  73. KeepAlive: conf.Net.KeepAlive,
  74. }
  75. if conf.Net.TLS.Enable {
  76. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  77. } else {
  78. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  79. }
  80. if b.connErr != nil {
  81. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  82. b.conn = nil
  83. atomic.StoreInt32(&b.opened, 0)
  84. return
  85. }
  86. b.conn = newBufConn(b.conn)
  87. b.conf = conf
  88. // Create or reuse the global metrics shared between brokers
  89. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  90. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  91. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  92. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  93. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  94. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  95. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  96. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  97. // the same id (-1) and are already exposed through the global metrics above
  98. if b.id >= 0 {
  99. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  100. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  101. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  102. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  103. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  104. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  105. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  106. }
  107. if conf.Net.SASL.Enable {
  108. b.connErr = b.sendAndReceiveSASLPlainAuth()
  109. if b.connErr != nil {
  110. err = b.conn.Close()
  111. if err == nil {
  112. Logger.Printf("Closed connection to broker %s\n", b.addr)
  113. } else {
  114. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  115. }
  116. b.conn = nil
  117. atomic.StoreInt32(&b.opened, 0)
  118. return
  119. }
  120. }
  121. b.done = make(chan bool)
  122. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  123. if b.id >= 0 {
  124. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  125. } else {
  126. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  127. }
  128. go withRecover(b.responseReceiver)
  129. })
  130. return nil
  131. }
  132. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  133. // connected but it had tried to connect, the error from that connection attempt is also returned.
  134. func (b *Broker) Connected() (bool, error) {
  135. b.lock.Lock()
  136. defer b.lock.Unlock()
  137. return b.conn != nil, b.connErr
  138. }
  139. func (b *Broker) Close() error {
  140. b.lock.Lock()
  141. defer b.lock.Unlock()
  142. if b.conn == nil {
  143. return ErrNotConnected
  144. }
  145. close(b.responses)
  146. <-b.done
  147. err := b.conn.Close()
  148. b.conn = nil
  149. b.connErr = nil
  150. b.done = nil
  151. b.responses = nil
  152. if err == nil {
  153. Logger.Printf("Closed connection to broker %s\n", b.addr)
  154. } else {
  155. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  156. }
  157. atomic.StoreInt32(&b.opened, 0)
  158. return err
  159. }
  160. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  161. func (b *Broker) ID() int32 {
  162. return b.id
  163. }
  164. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  165. func (b *Broker) Addr() string {
  166. return b.addr
  167. }
  168. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  169. response := new(MetadataResponse)
  170. err := b.sendAndReceive(request, response)
  171. if err != nil {
  172. return nil, err
  173. }
  174. return response, nil
  175. }
  176. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  177. response := new(ConsumerMetadataResponse)
  178. err := b.sendAndReceive(request, response)
  179. if err != nil {
  180. return nil, err
  181. }
  182. return response, nil
  183. }
  184. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  185. response := new(OffsetResponse)
  186. err := b.sendAndReceive(request, response)
  187. if err != nil {
  188. return nil, err
  189. }
  190. return response, nil
  191. }
  192. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  193. var response *ProduceResponse
  194. var err error
  195. if request.RequiredAcks == NoResponse {
  196. err = b.sendAndReceive(request, nil)
  197. } else {
  198. response = new(ProduceResponse)
  199. err = b.sendAndReceive(request, response)
  200. }
  201. if err != nil {
  202. return nil, err
  203. }
  204. return response, nil
  205. }
  206. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  207. response := new(FetchResponse)
  208. err := b.sendAndReceive(request, response)
  209. if err != nil {
  210. return nil, err
  211. }
  212. return response, nil
  213. }
  214. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  215. response := new(OffsetCommitResponse)
  216. err := b.sendAndReceive(request, response)
  217. if err != nil {
  218. return nil, err
  219. }
  220. return response, nil
  221. }
  222. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  223. response := new(OffsetFetchResponse)
  224. err := b.sendAndReceive(request, response)
  225. if err != nil {
  226. return nil, err
  227. }
  228. return response, nil
  229. }
  230. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  231. response := new(JoinGroupResponse)
  232. err := b.sendAndReceive(request, response)
  233. if err != nil {
  234. return nil, err
  235. }
  236. return response, nil
  237. }
  238. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  239. response := new(SyncGroupResponse)
  240. err := b.sendAndReceive(request, response)
  241. if err != nil {
  242. return nil, err
  243. }
  244. return response, nil
  245. }
  246. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  247. response := new(LeaveGroupResponse)
  248. err := b.sendAndReceive(request, response)
  249. if err != nil {
  250. return nil, err
  251. }
  252. return response, nil
  253. }
  254. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  255. response := new(HeartbeatResponse)
  256. err := b.sendAndReceive(request, response)
  257. if err != nil {
  258. return nil, err
  259. }
  260. return response, nil
  261. }
  262. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  263. response := new(ListGroupsResponse)
  264. err := b.sendAndReceive(request, response)
  265. if err != nil {
  266. return nil, err
  267. }
  268. return response, nil
  269. }
  270. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  271. response := new(DescribeGroupsResponse)
  272. err := b.sendAndReceive(request, response)
  273. if err != nil {
  274. return nil, err
  275. }
  276. return response, nil
  277. }
  278. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  279. response := new(ApiVersionsResponse)
  280. err := b.sendAndReceive(request, response)
  281. if err != nil {
  282. return nil, err
  283. }
  284. return response, nil
  285. }
  286. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  287. b.lock.Lock()
  288. defer b.lock.Unlock()
  289. if b.conn == nil {
  290. if b.connErr != nil {
  291. return nil, b.connErr
  292. }
  293. return nil, ErrNotConnected
  294. }
  295. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  296. return nil, ErrUnsupportedVersion
  297. }
  298. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  299. buf, err := encode(req, b.conf.MetricRegistry)
  300. if err != nil {
  301. return nil, err
  302. }
  303. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  304. if err != nil {
  305. return nil, err
  306. }
  307. requestTime := time.Now()
  308. bytes, err := b.conn.Write(buf)
  309. b.updateOutgoingCommunicationMetrics(bytes)
  310. if err != nil {
  311. return nil, err
  312. }
  313. b.correlationID++
  314. if !promiseResponse {
  315. // Record request latency without the response
  316. b.updateRequestLatencyMetrics(time.Since(requestTime))
  317. return nil, nil
  318. }
  319. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  320. b.responses <- promise
  321. return &promise, nil
  322. }
  323. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  324. promise, err := b.send(req, res != nil)
  325. if err != nil {
  326. return err
  327. }
  328. if promise == nil {
  329. return nil
  330. }
  331. select {
  332. case buf := <-promise.packets:
  333. return versionedDecode(buf, res, req.version())
  334. case err = <-promise.errors:
  335. return err
  336. }
  337. }
  338. func (b *Broker) decode(pd packetDecoder) (err error) {
  339. b.id, err = pd.getInt32()
  340. if err != nil {
  341. return err
  342. }
  343. host, err := pd.getString()
  344. if err != nil {
  345. return err
  346. }
  347. port, err := pd.getInt32()
  348. if err != nil {
  349. return err
  350. }
  351. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  352. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  353. return err
  354. }
  355. return nil
  356. }
  357. func (b *Broker) encode(pe packetEncoder) (err error) {
  358. host, portstr, err := net.SplitHostPort(b.addr)
  359. if err != nil {
  360. return err
  361. }
  362. port, err := strconv.Atoi(portstr)
  363. if err != nil {
  364. return err
  365. }
  366. pe.putInt32(b.id)
  367. err = pe.putString(host)
  368. if err != nil {
  369. return err
  370. }
  371. pe.putInt32(int32(port))
  372. return nil
  373. }
  374. func (b *Broker) responseReceiver() {
  375. var dead error
  376. header := make([]byte, 8)
  377. for response := range b.responses {
  378. if dead != nil {
  379. response.errors <- dead
  380. continue
  381. }
  382. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  383. if err != nil {
  384. dead = err
  385. response.errors <- err
  386. continue
  387. }
  388. bytesReadHeader, err := io.ReadFull(b.conn, header)
  389. requestLatency := time.Since(response.requestTime)
  390. if err != nil {
  391. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  392. dead = err
  393. response.errors <- err
  394. continue
  395. }
  396. decodedHeader := responseHeader{}
  397. err = decode(header, &decodedHeader)
  398. if err != nil {
  399. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  400. dead = err
  401. response.errors <- err
  402. continue
  403. }
  404. if decodedHeader.correlationID != response.correlationID {
  405. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  406. // TODO if decoded ID < cur ID, discard until we catch up
  407. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  408. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  409. response.errors <- dead
  410. continue
  411. }
  412. buf := make([]byte, decodedHeader.length-4)
  413. bytesReadBody, err := io.ReadFull(b.conn, buf)
  414. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  415. if err != nil {
  416. dead = err
  417. response.errors <- err
  418. continue
  419. }
  420. response.packets <- buf
  421. }
  422. close(b.done)
  423. }
  424. func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
  425. rb := &SaslHandshakeRequest{"PLAIN"}
  426. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  427. buf, err := encode(req, b.conf.MetricRegistry)
  428. if err != nil {
  429. return err
  430. }
  431. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  432. if err != nil {
  433. return err
  434. }
  435. requestTime := time.Now()
  436. bytes, err := b.conn.Write(buf)
  437. b.updateOutgoingCommunicationMetrics(bytes)
  438. if err != nil {
  439. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  440. return err
  441. }
  442. b.correlationID++
  443. //wait for the response
  444. header := make([]byte, 8) // response header
  445. _, err = io.ReadFull(b.conn, header)
  446. if err != nil {
  447. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  448. return err
  449. }
  450. length := binary.BigEndian.Uint32(header[:4])
  451. payload := make([]byte, length-4)
  452. n, err := io.ReadFull(b.conn, payload)
  453. if err != nil {
  454. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  455. return err
  456. }
  457. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  458. res := &SaslHandshakeResponse{}
  459. err = versionedDecode(payload, res, 0)
  460. if err != nil {
  461. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  462. return err
  463. }
  464. if res.Err != ErrNoError {
  465. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  466. return res.Err
  467. }
  468. Logger.Print("Successful SASL handshake")
  469. return nil
  470. }
  471. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  472. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  473. //
  474. // In SASL Plain, Kafka expects the auth header to be in the following format
  475. // Message format (from https://tools.ietf.org/html/rfc4616):
  476. //
  477. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  478. // authcid = 1*SAFE ; MUST accept up to 255 octets
  479. // authzid = 1*SAFE ; MUST accept up to 255 octets
  480. // passwd = 1*SAFE ; MUST accept up to 255 octets
  481. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  482. //
  483. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  484. // ;; any UTF-8 encoded Unicode character except NUL
  485. //
  486. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  487. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  488. // of responding to bad credentials but thats how its being done today.
  489. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  490. if b.conf.Net.SASL.Handshake {
  491. handshakeErr := b.sendAndReceiveSASLPlainHandshake()
  492. if handshakeErr != nil {
  493. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  494. return handshakeErr
  495. }
  496. }
  497. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  498. authBytes := make([]byte, length+4) //4 byte length header + auth data
  499. binary.BigEndian.PutUint32(authBytes, uint32(length))
  500. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  501. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  502. if err != nil {
  503. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  504. return err
  505. }
  506. requestTime := time.Now()
  507. bytesWritten, err := b.conn.Write(authBytes)
  508. b.updateOutgoingCommunicationMetrics(bytesWritten)
  509. if err != nil {
  510. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  511. return err
  512. }
  513. header := make([]byte, 4)
  514. n, err := io.ReadFull(b.conn, header)
  515. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  516. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  517. // Otherwise, the broker closes the connection and we get an EOF
  518. if err != nil {
  519. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  520. return err
  521. }
  522. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  523. return nil
  524. }
  525. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  526. b.updateRequestLatencyMetrics(requestLatency)
  527. b.responseRate.Mark(1)
  528. if b.brokerResponseRate != nil {
  529. b.brokerResponseRate.Mark(1)
  530. }
  531. responseSize := int64(bytes)
  532. b.incomingByteRate.Mark(responseSize)
  533. if b.brokerIncomingByteRate != nil {
  534. b.brokerIncomingByteRate.Mark(responseSize)
  535. }
  536. b.responseSize.Update(responseSize)
  537. if b.brokerResponseSize != nil {
  538. b.brokerResponseSize.Update(responseSize)
  539. }
  540. }
  541. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  542. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  543. b.requestLatency.Update(requestLatencyInMs)
  544. if b.brokerRequestLatency != nil {
  545. b.brokerRequestLatency.Update(requestLatencyInMs)
  546. }
  547. }
  548. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  549. b.requestRate.Mark(1)
  550. if b.brokerRequestRate != nil {
  551. b.brokerRequestRate.Mark(1)
  552. }
  553. requestSize := int64(bytes)
  554. b.outgoingByteRate.Mark(requestSize)
  555. if b.brokerOutgoingByteRate != nil {
  556. b.brokerOutgoingByteRate.Mark(requestSize)
  557. }
  558. b.requestSize.Update(requestSize)
  559. if b.brokerRequestSize != nil {
  560. b.brokerRequestSize.Update(requestSize)
  561. }
  562. }