broker.go 18 KB


  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/rcrowley/go-metrics"
  13. )
  14. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  15. type Broker struct {
  16. id int32
  17. addr string
  18. conf *Config
  19. correlationID int32
  20. conn net.Conn
  21. connErr error
  22. lock sync.Mutex
  23. opened int32
  24. responses chan responsePromise
  25. done chan bool
  26. incomingByteRate metrics.Meter
  27. requestRate metrics.Meter
  28. requestSize metrics.Histogram
  29. requestLatency metrics.Histogram
  30. outgoingByteRate metrics.Meter
  31. responseRate metrics.Meter
  32. responseSize metrics.Histogram
  33. brokerIncomingByteRate metrics.Meter
  34. brokerRequestRate metrics.Meter
  35. brokerRequestSize metrics.Histogram
  36. brokerRequestLatency metrics.Histogram
  37. brokerOutgoingByteRate metrics.Meter
  38. brokerResponseRate metrics.Meter
  39. brokerResponseSize metrics.Histogram
  40. }
  41. type responsePromise struct {
  42. requestTime time.Time
  43. correlationID int32
  44. packets chan []byte
  45. errors chan error
  46. }
  47. // NewBroker creates and returns a Broker targeting the given host:port address.
  48. // This does not attempt to actually connect, you have to call Open() for that.
  49. func NewBroker(addr string) *Broker {
  50. return &Broker{id: -1, addr: addr}
  51. }
  52. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  53. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  54. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  55. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  56. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  57. func (b *Broker) Open(conf *Config) error {
  58. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  59. return ErrAlreadyConnected
  60. }
  61. if conf == nil {
  62. conf = NewConfig()
  63. }
  64. err := conf.Validate()
  65. if err != nil {
  66. return err
  67. }
  68. b.lock.Lock()
  69. go withRecover(func() {
  70. defer b.lock.Unlock()
  71. dialer := net.Dialer{
  72. Timeout: conf.Net.DialTimeout,
  73. KeepAlive: conf.Net.KeepAlive,
  74. }
  75. if conf.Net.TLS.Enable {
  76. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  77. } else {
  78. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  79. }
  80. if b.connErr != nil {
  81. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  82. b.conn = nil
  83. atomic.StoreInt32(&b.opened, 0)
  84. return
  85. }
  86. b.conn = newBufConn(b.conn)
  87. b.conf = conf
  88. // Create or reuse the global metrics shared between brokers
  89. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  90. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  91. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  92. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
  93. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  94. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  95. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  96. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  97. // the same id (-1) and are already exposed through the global metrics above
  98. if b.id >= 0 {
  99. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  100. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  101. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  102. b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
  103. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  104. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  105. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  106. }
  107. if conf.Net.SASL.Enable {
  108. b.connErr = b.sendAndReceiveSASLPlainAuth()
  109. if b.connErr != nil {
  110. err = b.conn.Close()
  111. if err == nil {
  112. Logger.Printf("Closed connection to broker %s\n", b.addr)
  113. } else {
  114. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  115. }
  116. b.conn = nil
  117. atomic.StoreInt32(&b.opened, 0)
  118. return
  119. }
  120. }
  121. b.done = make(chan bool)
  122. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  123. if b.id >= 0 {
  124. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  125. } else {
  126. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  127. }
  128. go withRecover(b.responseReceiver)
  129. })
  130. return nil
  131. }
  132. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  133. // connected but it had tried to connect, the error from that connection attempt is also returned.
  134. func (b *Broker) Connected() (bool, error) {
  135. b.lock.Lock()
  136. defer b.lock.Unlock()
  137. return b.conn != nil, b.connErr
  138. }
  139. func (b *Broker) Close() error {
  140. b.lock.Lock()
  141. defer b.lock.Unlock()
  142. if b.conn == nil {
  143. return ErrNotConnected
  144. }
  145. close(b.responses)
  146. <-b.done
  147. err := b.conn.Close()
  148. b.conn = nil
  149. b.connErr = nil
  150. b.done = nil
  151. b.responses = nil
  152. if err == nil {
  153. Logger.Printf("Closed connection to broker %s\n", b.addr)
  154. } else {
  155. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  156. }
  157. atomic.StoreInt32(&b.opened, 0)
  158. return err
  159. }
  160. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  161. func (b *Broker) ID() int32 {
  162. return b.id
  163. }
  164. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  165. func (b *Broker) Addr() string {
  166. return b.addr
  167. }
  168. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  169. response := new(MetadataResponse)
  170. err := b.sendAndReceive(request, response)
  171. if err != nil {
  172. return nil, err
  173. }
  174. return response, nil
  175. }
  176. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  177. response := new(ConsumerMetadataResponse)
  178. err := b.sendAndReceive(request, response)
  179. if err != nil {
  180. return nil, err
  181. }
  182. return response, nil
  183. }
  184. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  185. response := new(OffsetResponse)
  186. err := b.sendAndReceive(request, response)
  187. if err != nil {
  188. return nil, err
  189. }
  190. return response, nil
  191. }
  192. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  193. var response *ProduceResponse
  194. var err error
  195. if request.RequiredAcks == NoResponse {
  196. err = b.sendAndReceive(request, nil)
  197. } else {
  198. response = new(ProduceResponse)
  199. err = b.sendAndReceive(request, response)
  200. }
  201. if err != nil {
  202. return nil, err
  203. }
  204. return response, nil
  205. }
  206. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  207. response := new(FetchResponse)
  208. err := b.sendAndReceive(request, response)
  209. if err != nil {
  210. return nil, err
  211. }
  212. return response, nil
  213. }
  214. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  215. response := new(OffsetCommitResponse)
  216. err := b.sendAndReceive(request, response)
  217. if err != nil {
  218. return nil, err
  219. }
  220. return response, nil
  221. }
  222. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  223. response := new(OffsetFetchResponse)
  224. err := b.sendAndReceive(request, response)
  225. if err != nil {
  226. return nil, err
  227. }
  228. return response, nil
  229. }
  230. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  231. response := new(JoinGroupResponse)
  232. err := b.sendAndReceive(request, response)
  233. if err != nil {
  234. return nil, err
  235. }
  236. return response, nil
  237. }
  238. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  239. response := new(SyncGroupResponse)
  240. err := b.sendAndReceive(request, response)
  241. if err != nil {
  242. return nil, err
  243. }
  244. return response, nil
  245. }
  246. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  247. response := new(LeaveGroupResponse)
  248. err := b.sendAndReceive(request, response)
  249. if err != nil {
  250. return nil, err
  251. }
  252. return response, nil
  253. }
  254. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  255. response := new(HeartbeatResponse)
  256. err := b.sendAndReceive(request, response)
  257. if err != nil {
  258. return nil, err
  259. }
  260. return response, nil
  261. }
  262. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  263. response := new(ListGroupsResponse)
  264. err := b.sendAndReceive(request, response)
  265. if err != nil {
  266. return nil, err
  267. }
  268. return response, nil
  269. }
  270. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  271. response := new(DescribeGroupsResponse)
  272. err := b.sendAndReceive(request, response)
  273. if err != nil {
  274. return nil, err
  275. }
  276. return response, nil
  277. }
  278. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  279. b.lock.Lock()
  280. defer b.lock.Unlock()
  281. if b.conn == nil {
  282. if b.connErr != nil {
  283. return nil, b.connErr
  284. }
  285. return nil, ErrNotConnected
  286. }
  287. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  288. return nil, ErrUnsupportedVersion
  289. }
  290. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  291. buf, err := encode(req, b.conf.MetricRegistry)
  292. if err != nil {
  293. return nil, err
  294. }
  295. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  296. if err != nil {
  297. return nil, err
  298. }
  299. requestTime := time.Now()
  300. bytes, err := b.conn.Write(buf)
  301. b.updateOutgoingCommunicationMetrics(bytes)
  302. if err != nil {
  303. return nil, err
  304. }
  305. b.correlationID++
  306. if !promiseResponse {
  307. // Record request latency without the response
  308. b.updateRequestLatencyMetrics(time.Since(requestTime))
  309. return nil, nil
  310. }
  311. promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
  312. b.responses <- promise
  313. return &promise, nil
  314. }
  315. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  316. promise, err := b.send(req, res != nil)
  317. if err != nil {
  318. return err
  319. }
  320. if promise == nil {
  321. return nil
  322. }
  323. select {
  324. case buf := <-promise.packets:
  325. return versionedDecode(buf, res, req.version())
  326. case err = <-promise.errors:
  327. return err
  328. }
  329. }
  330. func (b *Broker) decode(pd packetDecoder) (err error) {
  331. b.id, err = pd.getInt32()
  332. if err != nil {
  333. return err
  334. }
  335. host, err := pd.getString()
  336. if err != nil {
  337. return err
  338. }
  339. port, err := pd.getInt32()
  340. if err != nil {
  341. return err
  342. }
  343. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  344. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  345. return err
  346. }
  347. return nil
  348. }
  349. func (b *Broker) encode(pe packetEncoder) (err error) {
  350. host, portstr, err := net.SplitHostPort(b.addr)
  351. if err != nil {
  352. return err
  353. }
  354. port, err := strconv.Atoi(portstr)
  355. if err != nil {
  356. return err
  357. }
  358. pe.putInt32(b.id)
  359. err = pe.putString(host)
  360. if err != nil {
  361. return err
  362. }
  363. pe.putInt32(int32(port))
  364. return nil
  365. }
  366. func (b *Broker) responseReceiver() {
  367. var dead error
  368. header := make([]byte, 8)
  369. for response := range b.responses {
  370. if dead != nil {
  371. response.errors <- dead
  372. continue
  373. }
  374. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  375. if err != nil {
  376. dead = err
  377. response.errors <- err
  378. continue
  379. }
  380. bytesReadHeader, err := io.ReadFull(b.conn, header)
  381. requestLatency := time.Since(response.requestTime)
  382. if err != nil {
  383. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  384. dead = err
  385. response.errors <- err
  386. continue
  387. }
  388. decodedHeader := responseHeader{}
  389. err = decode(header, &decodedHeader)
  390. if err != nil {
  391. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  392. dead = err
  393. response.errors <- err
  394. continue
  395. }
  396. if decodedHeader.correlationID != response.correlationID {
  397. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  398. // TODO if decoded ID < cur ID, discard until we catch up
  399. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  400. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  401. response.errors <- dead
  402. continue
  403. }
  404. buf := make([]byte, decodedHeader.length-4)
  405. bytesReadBody, err := io.ReadFull(b.conn, buf)
  406. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  407. if err != nil {
  408. dead = err
  409. response.errors <- err
  410. continue
  411. }
  412. response.packets <- buf
  413. }
  414. close(b.done)
  415. }
  416. func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
  417. rb := &SaslHandshakeRequest{"PLAIN"}
  418. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  419. buf, err := encode(req, b.conf.MetricRegistry)
  420. if err != nil {
  421. return err
  422. }
  423. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  424. if err != nil {
  425. return err
  426. }
  427. requestTime := time.Now()
  428. bytes, err := b.conn.Write(buf)
  429. b.updateOutgoingCommunicationMetrics(bytes)
  430. if err != nil {
  431. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  432. return err
  433. }
  434. b.correlationID++
  435. //wait for the response
  436. header := make([]byte, 8) // response header
  437. _, err = io.ReadFull(b.conn, header)
  438. if err != nil {
  439. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  440. return err
  441. }
  442. length := binary.BigEndian.Uint32(header[:4])
  443. payload := make([]byte, length-4)
  444. n, err := io.ReadFull(b.conn, payload)
  445. if err != nil {
  446. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  447. return err
  448. }
  449. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  450. res := &SaslHandshakeResponse{}
  451. err = versionedDecode(payload, res, 0)
  452. if err != nil {
  453. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  454. return err
  455. }
  456. if res.Err != ErrNoError {
  457. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  458. return res.Err
  459. }
  460. Logger.Print("Successful SASL handshake")
  461. return nil
  462. }
  463. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  464. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  465. //
  466. // In SASL Plain, Kafka expects the auth header to be in the following format
  467. // Message format (from https://tools.ietf.org/html/rfc4616):
  468. //
  469. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  470. // authcid = 1*SAFE ; MUST accept up to 255 octets
  471. // authzid = 1*SAFE ; MUST accept up to 255 octets
  472. // passwd = 1*SAFE ; MUST accept up to 255 octets
  473. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  474. //
  475. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  476. // ;; any UTF-8 encoded Unicode character except NUL
  477. //
  478. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  479. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  480. // of responding to bad credentials but thats how its being done today.
  481. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  482. if b.conf.Net.SASL.Handshake {
  483. handshakeErr := b.sendAndReceiveSASLPlainHandshake()
  484. if handshakeErr != nil {
  485. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  486. return handshakeErr
  487. }
  488. }
  489. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  490. authBytes := make([]byte, length+4) //4 byte length header + auth data
  491. binary.BigEndian.PutUint32(authBytes, uint32(length))
  492. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  493. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  494. if err != nil {
  495. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  496. return err
  497. }
  498. requestTime := time.Now()
  499. bytesWritten, err := b.conn.Write(authBytes)
  500. b.updateOutgoingCommunicationMetrics(bytesWritten)
  501. if err != nil {
  502. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  503. return err
  504. }
  505. header := make([]byte, 4)
  506. n, err := io.ReadFull(b.conn, header)
  507. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  508. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  509. // Otherwise, the broker closes the connection and we get an EOF
  510. if err != nil {
  511. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  512. return err
  513. }
  514. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  515. return nil
  516. }
  517. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  518. b.updateRequestLatencyMetrics(requestLatency)
  519. b.responseRate.Mark(1)
  520. if b.brokerResponseRate != nil {
  521. b.brokerResponseRate.Mark(1)
  522. }
  523. responseSize := int64(bytes)
  524. b.incomingByteRate.Mark(responseSize)
  525. if b.brokerIncomingByteRate != nil {
  526. b.brokerIncomingByteRate.Mark(responseSize)
  527. }
  528. b.responseSize.Update(responseSize)
  529. if b.brokerResponseSize != nil {
  530. b.brokerResponseSize.Update(responseSize)
  531. }
  532. }
  533. func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
  534. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  535. b.requestLatency.Update(requestLatencyInMs)
  536. if b.brokerRequestLatency != nil {
  537. b.brokerRequestLatency.Update(requestLatencyInMs)
  538. }
  539. }
  540. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  541. b.requestRate.Mark(1)
  542. if b.brokerRequestRate != nil {
  543. b.brokerRequestRate.Mark(1)
  544. }
  545. requestSize := int64(bytes)
  546. b.outgoingByteRate.Mark(requestSize)
  547. if b.brokerOutgoingByteRate != nil {
  548. b.brokerOutgoingByteRate.Mark(requestSize)
  549. }
  550. b.requestSize.Update(requestSize)
  551. if b.brokerRequestSize != nil {
  552. b.brokerRequestSize.Update(requestSize)
  553. }
  554. }