broker_test.go 12 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "testing"
  7. "time"
  8. metrics "github.com/rcrowley/go-metrics"
  9. )
  10. func ExampleBroker() {
  11. broker := NewBroker("localhost:9092")
  12. err := broker.Open(nil)
  13. if err != nil {
  14. panic(err)
  15. }
  16. request := MetadataRequest{Topics: []string{"myTopic"}}
  17. response, err := broker.GetMetadata(&request)
  18. if err != nil {
  19. _ = broker.Close()
  20. panic(err)
  21. }
  22. fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
  23. if err = broker.Close(); err != nil {
  24. panic(err)
  25. }
  26. }
  27. type mockEncoder struct {
  28. bytes []byte
  29. }
  30. func (m mockEncoder) encode(pe packetEncoder) error {
  31. return pe.putRawBytes(m.bytes)
  32. }
  33. type brokerMetrics struct {
  34. bytesRead int
  35. bytesWritten int
  36. }
  37. func TestBrokerAccessors(t *testing.T) {
  38. broker := NewBroker("abc:123")
  39. if broker.ID() != -1 {
  40. t.Error("New broker didn't have an ID of -1.")
  41. }
  42. if broker.Addr() != "abc:123" {
  43. t.Error("New broker didn't have the correct address")
  44. }
  45. if broker.Rack() != "" {
  46. t.Error("New broker didn't have an unknown rack.")
  47. }
  48. broker.id = 34
  49. if broker.ID() != 34 {
  50. t.Error("Manually setting broker ID did not take effect.")
  51. }
  52. rack := "dc1"
  53. broker.rack = &rack
  54. if broker.Rack() != rack {
  55. t.Error("Manually setting broker rack did not take effect.")
  56. }
  57. }
  58. func TestSimpleBrokerCommunication(t *testing.T) {
  59. for _, tt := range brokerTestTable {
  60. Logger.Printf("Testing broker communication for %s", tt.name)
  61. mb := NewMockBroker(t, 0)
  62. mb.Returns(&mockEncoder{tt.response})
  63. pendingNotify := make(chan brokerMetrics)
  64. // Register a callback to be notified about successful requests
  65. mb.SetNotifier(func(bytesRead, bytesWritten int) {
  66. pendingNotify <- brokerMetrics{bytesRead, bytesWritten}
  67. })
  68. broker := NewBroker(mb.Addr())
  69. // Set the broker id in order to validate local broker metrics
  70. broker.id = 0
  71. conf := NewConfig()
  72. conf.Version = tt.version
  73. err := broker.Open(conf)
  74. if err != nil {
  75. t.Fatal(err)
  76. }
  77. tt.runner(t, broker)
  78. // Wait up to 500 ms for the remote broker to process the request and
  79. // notify us about the metrics
  80. timeout := 500 * time.Millisecond
  81. select {
  82. case mockBrokerMetrics := <-pendingNotify:
  83. validateBrokerMetrics(t, broker, mockBrokerMetrics)
  84. case <-time.After(timeout):
  85. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  86. }
  87. mb.Close()
  88. err = broker.Close()
  89. if err != nil {
  90. t.Error(err)
  91. }
  92. }
  93. }
  94. var ErrTokenFailure = errors.New("Failure generating token")
  95. type TokenProvider struct {
  96. accessToken string
  97. err error
  98. }
  99. func (t *TokenProvider) Token() (string, error) {
  100. return t.accessToken, t.err
  101. }
  102. func newTokenProvider(accessToken string, err error) *TokenProvider {
  103. return &TokenProvider{
  104. accessToken: accessToken,
  105. err: err,
  106. }
  107. }
  108. func TestReceiveSASLOAuthBearerClientResponse(t *testing.T) {
  109. testTable := []struct {
  110. name string
  111. err error
  112. tokProvider *TokenProvider
  113. }{
  114. {"OK server response",
  115. nil,
  116. newTokenProvider("access-token-123", nil),
  117. },
  118. {"SASL authentication failure response",
  119. ErrSASLAuthenticationFailed,
  120. newTokenProvider("access-token-123", nil),
  121. },
  122. {"Token generation error",
  123. ErrTokenFailure,
  124. newTokenProvider("access-token-123", ErrTokenFailure),
  125. },
  126. }
  127. for i, test := range testTable {
  128. // mockBroker mocks underlying network logic and broker responses
  129. mockBroker := NewMockBroker(t, 0)
  130. mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t)
  131. if e, ok := test.err.(KError); ok {
  132. mockSASLAuthResponse = mockSASLAuthResponse.SetError(e)
  133. }
  134. mockBroker.SetHandlerByMap(map[string]MockResponse{
  135. "SaslAuthenticateRequest": mockSASLAuthResponse,
  136. "SaslHandshakeRequest": NewMockSaslHandshakeResponse(t).
  137. SetEnabledMechanisms([]string{SASLTypeOAuth}),
  138. })
  139. // broker executes SASL requests against mockBroker
  140. broker := NewBroker(mockBroker.Addr())
  141. broker.requestRate = metrics.NilMeter{}
  142. broker.outgoingByteRate = metrics.NilMeter{}
  143. broker.incomingByteRate = metrics.NilMeter{}
  144. broker.requestSize = metrics.NilHistogram{}
  145. broker.responseSize = metrics.NilHistogram{}
  146. broker.responseRate = metrics.NilMeter{}
  147. broker.requestLatency = metrics.NilHistogram{}
  148. conf := NewConfig()
  149. broker.conf = conf
  150. dialer := net.Dialer{
  151. Timeout: conf.Net.DialTimeout,
  152. KeepAlive: conf.Net.KeepAlive,
  153. LocalAddr: conf.Net.LocalAddr,
  154. }
  155. conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. broker.conn = conn
  160. err = broker.sendAndReceiveSASLOAuth(test.tokProvider)
  161. if test.err != err {
  162. t.Errorf("[%d]:[%s] Expected %s error, got %s\n", i, test.name, test.err, err)
  163. }
  164. mockBroker.Close()
  165. }
  166. }
  167. // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
  168. var brokerTestTable = []struct {
  169. version KafkaVersion
  170. name string
  171. response []byte
  172. runner func(*testing.T, *Broker)
  173. }{
  174. {V0_10_0_0,
  175. "MetadataRequest",
  176. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  177. func(t *testing.T, broker *Broker) {
  178. request := MetadataRequest{}
  179. response, err := broker.GetMetadata(&request)
  180. if err != nil {
  181. t.Error(err)
  182. }
  183. if response == nil {
  184. t.Error("Metadata request got no response!")
  185. }
  186. }},
  187. {V0_10_0_0,
  188. "ConsumerMetadataRequest",
  189. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
  190. func(t *testing.T, broker *Broker) {
  191. request := ConsumerMetadataRequest{}
  192. response, err := broker.GetConsumerMetadata(&request)
  193. if err != nil {
  194. t.Error(err)
  195. }
  196. if response == nil {
  197. t.Error("Consumer Metadata request got no response!")
  198. }
  199. }},
  200. {V0_10_0_0,
  201. "ProduceRequest (NoResponse)",
  202. []byte{},
  203. func(t *testing.T, broker *Broker) {
  204. request := ProduceRequest{}
  205. request.RequiredAcks = NoResponse
  206. response, err := broker.Produce(&request)
  207. if err != nil {
  208. t.Error(err)
  209. }
  210. if response != nil {
  211. t.Error("Produce request with NoResponse got a response!")
  212. }
  213. }},
  214. {V0_10_0_0,
  215. "ProduceRequest (WaitForLocal)",
  216. []byte{0x00, 0x00, 0x00, 0x00},
  217. func(t *testing.T, broker *Broker) {
  218. request := ProduceRequest{}
  219. request.RequiredAcks = WaitForLocal
  220. response, err := broker.Produce(&request)
  221. if err != nil {
  222. t.Error(err)
  223. }
  224. if response == nil {
  225. t.Error("Produce request without NoResponse got no response!")
  226. }
  227. }},
  228. {V0_10_0_0,
  229. "FetchRequest",
  230. []byte{0x00, 0x00, 0x00, 0x00},
  231. func(t *testing.T, broker *Broker) {
  232. request := FetchRequest{}
  233. response, err := broker.Fetch(&request)
  234. if err != nil {
  235. t.Error(err)
  236. }
  237. if response == nil {
  238. t.Error("Fetch request got no response!")
  239. }
  240. }},
  241. {V0_10_0_0,
  242. "OffsetFetchRequest",
  243. []byte{0x00, 0x00, 0x00, 0x00},
  244. func(t *testing.T, broker *Broker) {
  245. request := OffsetFetchRequest{}
  246. response, err := broker.FetchOffset(&request)
  247. if err != nil {
  248. t.Error(err)
  249. }
  250. if response == nil {
  251. t.Error("OffsetFetch request got no response!")
  252. }
  253. }},
  254. {V0_10_0_0,
  255. "OffsetCommitRequest",
  256. []byte{0x00, 0x00, 0x00, 0x00},
  257. func(t *testing.T, broker *Broker) {
  258. request := OffsetCommitRequest{}
  259. response, err := broker.CommitOffset(&request)
  260. if err != nil {
  261. t.Error(err)
  262. }
  263. if response == nil {
  264. t.Error("OffsetCommit request got no response!")
  265. }
  266. }},
  267. {V0_10_0_0,
  268. "OffsetRequest",
  269. []byte{0x00, 0x00, 0x00, 0x00},
  270. func(t *testing.T, broker *Broker) {
  271. request := OffsetRequest{}
  272. response, err := broker.GetAvailableOffsets(&request)
  273. if err != nil {
  274. t.Error(err)
  275. }
  276. if response == nil {
  277. t.Error("Offset request got no response!")
  278. }
  279. }},
  280. {V0_10_0_0,
  281. "JoinGroupRequest",
  282. []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  283. func(t *testing.T, broker *Broker) {
  284. request := JoinGroupRequest{}
  285. response, err := broker.JoinGroup(&request)
  286. if err != nil {
  287. t.Error(err)
  288. }
  289. if response == nil {
  290. t.Error("JoinGroup request got no response!")
  291. }
  292. }},
  293. {V0_10_0_0,
  294. "SyncGroupRequest",
  295. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  296. func(t *testing.T, broker *Broker) {
  297. request := SyncGroupRequest{}
  298. response, err := broker.SyncGroup(&request)
  299. if err != nil {
  300. t.Error(err)
  301. }
  302. if response == nil {
  303. t.Error("SyncGroup request got no response!")
  304. }
  305. }},
  306. {V0_10_0_0,
  307. "LeaveGroupRequest",
  308. []byte{0x00, 0x00},
  309. func(t *testing.T, broker *Broker) {
  310. request := LeaveGroupRequest{}
  311. response, err := broker.LeaveGroup(&request)
  312. if err != nil {
  313. t.Error(err)
  314. }
  315. if response == nil {
  316. t.Error("LeaveGroup request got no response!")
  317. }
  318. }},
  319. {V0_10_0_0,
  320. "HeartbeatRequest",
  321. []byte{0x00, 0x00},
  322. func(t *testing.T, broker *Broker) {
  323. request := HeartbeatRequest{}
  324. response, err := broker.Heartbeat(&request)
  325. if err != nil {
  326. t.Error(err)
  327. }
  328. if response == nil {
  329. t.Error("Heartbeat request got no response!")
  330. }
  331. }},
  332. {V0_10_0_0,
  333. "ListGroupsRequest",
  334. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  335. func(t *testing.T, broker *Broker) {
  336. request := ListGroupsRequest{}
  337. response, err := broker.ListGroups(&request)
  338. if err != nil {
  339. t.Error(err)
  340. }
  341. if response == nil {
  342. t.Error("ListGroups request got no response!")
  343. }
  344. }},
  345. {V0_10_0_0,
  346. "DescribeGroupsRequest",
  347. []byte{0x00, 0x00, 0x00, 0x00},
  348. func(t *testing.T, broker *Broker) {
  349. request := DescribeGroupsRequest{}
  350. response, err := broker.DescribeGroups(&request)
  351. if err != nil {
  352. t.Error(err)
  353. }
  354. if response == nil {
  355. t.Error("DescribeGroups request got no response!")
  356. }
  357. }},
  358. {V0_10_0_0,
  359. "ApiVersionsRequest",
  360. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  361. func(t *testing.T, broker *Broker) {
  362. request := ApiVersionsRequest{}
  363. response, err := broker.ApiVersions(&request)
  364. if err != nil {
  365. t.Error(err)
  366. }
  367. if response == nil {
  368. t.Error("ApiVersions request got no response!")
  369. }
  370. }},
  371. {V1_1_0_0,
  372. "DeleteGroupsRequest",
  373. []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
  374. func(t *testing.T, broker *Broker) {
  375. request := DeleteGroupsRequest{}
  376. response, err := broker.DeleteGroups(&request)
  377. if err != nil {
  378. t.Error(err)
  379. }
  380. if response == nil {
  381. t.Error("DeleteGroups request got no response!")
  382. }
  383. }},
  384. }
  385. func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
  386. metricValidators := newMetricValidators()
  387. mockBrokerBytesRead := mockBrokerMetrics.bytesRead
  388. mockBrokerBytesWritten := mockBrokerMetrics.bytesWritten
  389. // Check that the number of bytes sent corresponds to what the mock broker received
  390. metricValidators.registerForAllBrokers(broker, countMeterValidator("incoming-byte-rate", mockBrokerBytesWritten))
  391. if mockBrokerBytesWritten == 0 {
  392. // This a ProduceRequest with NoResponse
  393. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 0))
  394. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 0))
  395. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", 0, 0))
  396. } else {
  397. metricValidators.registerForAllBrokers(broker, countMeterValidator("response-rate", 1))
  398. metricValidators.registerForAllBrokers(broker, countHistogramValidator("response-size", 1))
  399. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("response-size", mockBrokerBytesWritten, mockBrokerBytesWritten))
  400. }
  401. // Check that the number of bytes received corresponds to what the mock broker sent
  402. metricValidators.registerForAllBrokers(broker, countMeterValidator("outgoing-byte-rate", mockBrokerBytesRead))
  403. metricValidators.registerForAllBrokers(broker, countMeterValidator("request-rate", 1))
  404. metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
  405. metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))
  406. // Run the validators
  407. metricValidators.run(t, broker.conf.MetricRegistry)
  408. }