broker.go 15 KB


  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net"
  8. "strconv"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/rcrowley/go-metrics"
  13. )
  14. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  15. type Broker struct {
  16. id int32
  17. addr string
  18. conf *Config
  19. correlationID int32
  20. conn net.Conn
  21. connErr error
  22. lock sync.Mutex
  23. opened int32
  24. responses chan responsePromise
  25. done chan bool
  26. incomingByteRate metrics.Meter
  27. requestRate metrics.Meter
  28. requestSize metrics.Histogram
  29. outgoingByteRate metrics.Meter
  30. responseRate metrics.Meter
  31. responseSize metrics.Histogram
  32. brokerIncomingByteRate metrics.Meter
  33. brokerRequestRate metrics.Meter
  34. brokerRequestSize metrics.Histogram
  35. brokerOutgoingByteRate metrics.Meter
  36. brokerResponseRate metrics.Meter
  37. brokerResponseSize metrics.Histogram
  38. }
  39. type responsePromise struct {
  40. correlationID int32
  41. packets chan []byte
  42. errors chan error
  43. }
  44. // NewBroker creates and returns a Broker targetting the given host:port address.
  45. // This does not attempt to actually connect, you have to call Open() for that.
  46. func NewBroker(addr string) *Broker {
  47. return &Broker{id: -1, addr: addr}
  48. }
  49. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  50. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  51. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  52. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  53. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  54. func (b *Broker) Open(conf *Config) error {
  55. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  56. return ErrAlreadyConnected
  57. }
  58. if conf == nil {
  59. conf = NewConfig()
  60. }
  61. err := conf.Validate()
  62. if err != nil {
  63. return err
  64. }
  65. b.lock.Lock()
  66. go withRecover(func() {
  67. defer b.lock.Unlock()
  68. dialer := net.Dialer{
  69. Timeout: conf.Net.DialTimeout,
  70. KeepAlive: conf.Net.KeepAlive,
  71. }
  72. if conf.Net.TLS.Enable {
  73. b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
  74. } else {
  75. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  76. }
  77. if b.connErr != nil {
  78. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  79. b.conn = nil
  80. atomic.StoreInt32(&b.opened, 0)
  81. return
  82. }
  83. b.conn = newBufConn(b.conn)
  84. b.conf = conf
  85. // Create or reuse the global metrics shared between brokers
  86. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
  87. b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
  88. b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
  89. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
  90. b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
  91. b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
  92. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  93. // the same id (-1) and are already exposed through the global metrics above
  94. if b.id >= 0 {
  95. b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
  96. b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
  97. b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
  98. b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
  99. b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
  100. b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
  101. }
  102. if conf.Net.SASL.Enable {
  103. b.connErr = b.sendAndReceiveSASLPlainAuth()
  104. if b.connErr != nil {
  105. err = b.conn.Close()
  106. if err == nil {
  107. Logger.Printf("Closed connection to broker %s\n", b.addr)
  108. } else {
  109. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  110. }
  111. b.conn = nil
  112. atomic.StoreInt32(&b.opened, 0)
  113. return
  114. }
  115. }
  116. b.done = make(chan bool)
  117. b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
  118. if b.id >= 0 {
  119. Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  120. } else {
  121. Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  122. }
  123. go withRecover(b.responseReceiver)
  124. })
  125. return nil
  126. }
  127. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  128. // connected but it had tried to connect, the error from that connection attempt is also returned.
  129. func (b *Broker) Connected() (bool, error) {
  130. b.lock.Lock()
  131. defer b.lock.Unlock()
  132. return b.conn != nil, b.connErr
  133. }
  134. func (b *Broker) Close() error {
  135. b.lock.Lock()
  136. defer b.lock.Unlock()
  137. if b.conn == nil {
  138. return ErrNotConnected
  139. }
  140. close(b.responses)
  141. <-b.done
  142. err := b.conn.Close()
  143. b.conn = nil
  144. b.connErr = nil
  145. b.done = nil
  146. b.responses = nil
  147. if err == nil {
  148. Logger.Printf("Closed connection to broker %s\n", b.addr)
  149. } else {
  150. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  151. }
  152. atomic.StoreInt32(&b.opened, 0)
  153. return err
  154. }
  155. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  156. func (b *Broker) ID() int32 {
  157. return b.id
  158. }
  159. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  160. func (b *Broker) Addr() string {
  161. return b.addr
  162. }
  163. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  164. response := new(MetadataResponse)
  165. err := b.sendAndReceive(request, response)
  166. if err != nil {
  167. return nil, err
  168. }
  169. return response, nil
  170. }
  171. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  172. response := new(ConsumerMetadataResponse)
  173. err := b.sendAndReceive(request, response)
  174. if err != nil {
  175. return nil, err
  176. }
  177. return response, nil
  178. }
  179. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  180. response := new(OffsetResponse)
  181. err := b.sendAndReceive(request, response)
  182. if err != nil {
  183. return nil, err
  184. }
  185. return response, nil
  186. }
  187. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  188. var response *ProduceResponse
  189. var err error
  190. if request.RequiredAcks == NoResponse {
  191. err = b.sendAndReceive(request, nil)
  192. } else {
  193. response = new(ProduceResponse)
  194. err = b.sendAndReceive(request, response)
  195. }
  196. if err != nil {
  197. return nil, err
  198. }
  199. return response, nil
  200. }
  201. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  202. response := new(FetchResponse)
  203. err := b.sendAndReceive(request, response)
  204. if err != nil {
  205. return nil, err
  206. }
  207. return response, nil
  208. }
  209. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  210. response := new(OffsetCommitResponse)
  211. err := b.sendAndReceive(request, response)
  212. if err != nil {
  213. return nil, err
  214. }
  215. return response, nil
  216. }
  217. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  218. response := new(OffsetFetchResponse)
  219. err := b.sendAndReceive(request, response)
  220. if err != nil {
  221. return nil, err
  222. }
  223. return response, nil
  224. }
  225. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  226. response := new(JoinGroupResponse)
  227. err := b.sendAndReceive(request, response)
  228. if err != nil {
  229. return nil, err
  230. }
  231. return response, nil
  232. }
  233. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  234. response := new(SyncGroupResponse)
  235. err := b.sendAndReceive(request, response)
  236. if err != nil {
  237. return nil, err
  238. }
  239. return response, nil
  240. }
  241. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  242. response := new(LeaveGroupResponse)
  243. err := b.sendAndReceive(request, response)
  244. if err != nil {
  245. return nil, err
  246. }
  247. return response, nil
  248. }
  249. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  250. response := new(HeartbeatResponse)
  251. err := b.sendAndReceive(request, response)
  252. if err != nil {
  253. return nil, err
  254. }
  255. return response, nil
  256. }
  257. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  258. response := new(ListGroupsResponse)
  259. err := b.sendAndReceive(request, response)
  260. if err != nil {
  261. return nil, err
  262. }
  263. return response, nil
  264. }
  265. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  266. response := new(DescribeGroupsResponse)
  267. err := b.sendAndReceive(request, response)
  268. if err != nil {
  269. return nil, err
  270. }
  271. return response, nil
  272. }
  273. func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
  274. b.lock.Lock()
  275. defer b.lock.Unlock()
  276. if b.conn == nil {
  277. if b.connErr != nil {
  278. return nil, b.connErr
  279. }
  280. return nil, ErrNotConnected
  281. }
  282. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  283. return nil, ErrUnsupportedVersion
  284. }
  285. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  286. buf, err := encode(req)
  287. if err != nil {
  288. return nil, err
  289. }
  290. err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  291. if err != nil {
  292. return nil, err
  293. }
  294. bytes, err := b.conn.Write(buf)
  295. b.updateOutgoingCommunicationMetrics(bytes)
  296. if err != nil {
  297. return nil, err
  298. }
  299. b.correlationID++
  300. if !promiseResponse {
  301. return nil, nil
  302. }
  303. promise := responsePromise{req.correlationID, make(chan []byte), make(chan error)}
  304. b.responses <- promise
  305. return &promise, nil
  306. }
  307. func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
  308. promise, err := b.send(req, res != nil)
  309. if err != nil {
  310. return err
  311. }
  312. if promise == nil {
  313. return nil
  314. }
  315. select {
  316. case buf := <-promise.packets:
  317. return versionedDecode(buf, res, req.version())
  318. case err = <-promise.errors:
  319. return err
  320. }
  321. }
  322. func (b *Broker) decode(pd packetDecoder) (err error) {
  323. b.id, err = pd.getInt32()
  324. if err != nil {
  325. return err
  326. }
  327. host, err := pd.getString()
  328. if err != nil {
  329. return err
  330. }
  331. port, err := pd.getInt32()
  332. if err != nil {
  333. return err
  334. }
  335. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  336. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  337. return err
  338. }
  339. return nil
  340. }
  341. func (b *Broker) encode(pe packetEncoder) (err error) {
  342. host, portstr, err := net.SplitHostPort(b.addr)
  343. if err != nil {
  344. return err
  345. }
  346. port, err := strconv.Atoi(portstr)
  347. if err != nil {
  348. return err
  349. }
  350. pe.putInt32(b.id)
  351. err = pe.putString(host)
  352. if err != nil {
  353. return err
  354. }
  355. pe.putInt32(int32(port))
  356. return nil
  357. }
  358. func (b *Broker) responseReceiver() {
  359. var dead error
  360. header := make([]byte, 8)
  361. for response := range b.responses {
  362. if dead != nil {
  363. response.errors <- dead
  364. continue
  365. }
  366. err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
  367. if err != nil {
  368. dead = err
  369. response.errors <- err
  370. continue
  371. }
  372. bytesReadHeader, err := io.ReadFull(b.conn, header)
  373. if err != nil {
  374. b.updateIncomingCommunicationMetrics(bytesReadHeader)
  375. dead = err
  376. response.errors <- err
  377. continue
  378. }
  379. decodedHeader := responseHeader{}
  380. err = decode(header, &decodedHeader)
  381. if err != nil {
  382. b.updateIncomingCommunicationMetrics(bytesReadHeader)
  383. dead = err
  384. response.errors <- err
  385. continue
  386. }
  387. if decodedHeader.correlationID != response.correlationID {
  388. b.updateIncomingCommunicationMetrics(bytesReadHeader)
  389. // TODO if decoded ID < cur ID, discard until we catch up
  390. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  391. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  392. response.errors <- dead
  393. continue
  394. }
  395. buf := make([]byte, decodedHeader.length-4)
  396. bytesReadBody, err := io.ReadFull(b.conn, buf)
  397. b.updateIncomingCommunicationMetrics(bytesReadHeader + bytesReadBody)
  398. if err != nil {
  399. dead = err
  400. response.errors <- err
  401. continue
  402. }
  403. response.packets <- buf
  404. }
  405. close(b.done)
  406. }
  407. // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
  408. // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
  409. //
  410. // In SASL Plain, Kafka expects the auth header to be in the following format
  411. // Message format (from https://tools.ietf.org/html/rfc4616):
  412. //
  413. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  414. // authcid = 1*SAFE ; MUST accept up to 255 octets
  415. // authzid = 1*SAFE ; MUST accept up to 255 octets
  416. // passwd = 1*SAFE ; MUST accept up to 255 octets
  417. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  418. //
  419. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  420. // ;; any UTF-8 encoded Unicode character except NUL
  421. //
  422. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  423. // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
  424. // of responding to bad credentials but thats how its being done today.
  425. func (b *Broker) sendAndReceiveSASLPlainAuth() error {
  426. length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  427. authBytes := make([]byte, length+4) //4 byte length header + auth data
  428. binary.BigEndian.PutUint32(authBytes, uint32(length))
  429. copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
  430. err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
  431. if err != nil {
  432. Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
  433. return err
  434. }
  435. bytesWritten, err := b.conn.Write(authBytes)
  436. b.updateOutgoingCommunicationMetrics(bytesWritten)
  437. if err != nil {
  438. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  439. return err
  440. }
  441. header := make([]byte, 4)
  442. n, err := io.ReadFull(b.conn, header)
  443. b.updateIncomingCommunicationMetrics(n)
  444. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  445. // Otherwise, the broker closes the connection and we get an EOF
  446. if err != nil {
  447. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  448. return err
  449. }
  450. Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  451. return nil
  452. }
  453. func (b *Broker) updateIncomingCommunicationMetrics(bytes int) {
  454. b.responseRate.Mark(1)
  455. if b.brokerResponseRate != nil {
  456. b.brokerResponseRate.Mark(1)
  457. }
  458. responseSize := int64(bytes)
  459. b.incomingByteRate.Mark(responseSize)
  460. if b.brokerIncomingByteRate != nil {
  461. b.brokerIncomingByteRate.Mark(responseSize)
  462. }
  463. b.responseSize.Update(responseSize)
  464. if b.brokerResponseSize != nil {
  465. b.brokerResponseSize.Update(responseSize)
  466. }
  467. }
  468. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  469. b.requestRate.Mark(1)
  470. if b.brokerRequestRate != nil {
  471. b.brokerRequestRate.Mark(1)
  472. }
  473. requestSize := int64(bytes)
  474. b.outgoingByteRate.Mark(requestSize)
  475. if b.brokerOutgoingByteRate != nil {
  476. b.brokerOutgoingByteRate.Mark(requestSize)
  477. }
  478. b.requestSize.Update(requestSize)
  479. if b.brokerRequestSize != nil {
  480. b.brokerRequestSize.Update(requestSize)
  481. }
  482. }